0
点赞
收藏
分享

微信扫一扫

HDFS命令行执行流程分析

静悠 2023-11-07 阅读 42

1. 背景

在执行HDFS命令时,通常会设置环境变量,在执行具体操作。例如:

export HADOOP_CONF_DIR=/home/hadoop/conf/cluster1
hdfs dfs -ls hdfs://cluster1/data/xxx

在执行脚本时,我往往会有两个疑问:

  1. hdfs命令如果加载HADOOP_CONF_DIR配置的,加载后发生了什么。
  2. Java如果解析-ls操作,代码中是如何流转的。

本文将对这两个疑问进行探讨。

2. HDFS环境变量加载流程

2.1 提前总结

  1. 执行hdfs命令先加载配置,然后启动java虚拟机。
  2. hdfs命令中执行hdfs-config.sh脚本,该脚本负责执行hadoop-env.sh脚本。
  3. hadoop_config.sh脚本先执行hadoop-functions.sh脚本,然后调用该脚本中的方法。
  4. 先后执行hadoop-functions.sh脚本中的hadoop_find_confdir和hadoop_exec_hadoopenv方法,从linux环境变量中获取HADOOP_CONF_DIR,在HADOOP_CONF_DIR路径中执行haodop-env.sh。
  5. haodop-env.sh用于运维人员设置所有真实生效的配置。

2.2 hdfs命令执行脚本

首先,hdfs获取HADOOP_HOME所在目录,获取目录中libexec的目录路径,执行libexec目录中hdfs-config.sh脚本,开始加载配置:

# let's locate libexec...
if [[ -n "${HADOOP_HOME}" ]]; then
  HADOOP_DEFAULT_LIBEXEC_DIR="${HADOOP_HOME}/libexec"
else
  bin=$(cd -P -- "$(dirname -- "${MYNAME}")" >/dev/null && pwd -P)
  HADOOP_DEFAULT_LIBEXEC_DIR="${bin}/../libexec"
fi

HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$HADOOP_DEFAULT_LIBEXEC_DIR}"
HADOOP_NEW_CONFIG=true
if [[ -f "${HADOOP_LIBEXEC_DIR}/hdfs-config.sh" ]]; then
  # shellcheck source=./hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.sh
  . "${HADOOP_LIBEXEC_DIR}/hdfs-config.sh"
else
  echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/hdfs-config.sh." 2>&1
  exit 1
fi

加载完配置后,开始启动java虚拟机,它执行hdfscmd_case方法,将解析到的配置传递进来:

hdfscmd_case "${HADOOP_SUBCMD}" "${HADOOP_SUBCMD_ARGS[@]}"

hdfscmd_case方法根据命令匹配对应的Java类,例如hdfs dfs命令最终执行的是org.apache.hadoop.fs.FsShell类:

## @param        CLI arguments
function hdfscmd_case
{
  subcmd=$1
  shift

  case ${subcmd} in
    dfs)
      HADOOP_CLASSNAME=org.apache.hadoop.fs.FsShell
    ;;
  //省略
}

本小节只研究配置加载过程,FsShell解析流程待下一小节解析。

2.3 hdfs-config.sh脚本执行

它只负责执行hadoop-config.sh脚本:

if [[ -n "${HADOOP_COMMON_HOME}" ]] &&
   [[ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" ]]; then
  . "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh"
elif [[ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]]; then
  . "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh"
elif [ -e "${HADOOP_HOME}/libexec/hadoop-config.sh" ]; then
  . "${HADOOP_HOME}/libexec/hadoop-config.sh"
else
  echo "ERROR: Hadoop common not found." 2>&1
  exit 1

2.4 hadoop_config.sh脚本执行

加载hadoop-functions.sh脚本,获取所有方法:

# get our functions defined for usage later
if [[ -n "${HADOOP_COMMON_HOME}" ]] &&
   [[ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-functions.sh" ]]; then
  # shellcheck source=./hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
  . "${HADOOP_COMMON_HOME}/libexec/hadoop-functions.sh"
elif [[ -e "${HADOOP_LIBEXEC_DIR}/hadoop-functions.sh" ]]; then
  # shellcheck source=./hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
  . "${HADOOP_LIBEXEC_DIR}/hadoop-functions.sh"
else
  echo "ERROR: Unable to exec ${HADOOP_LIBEXEC_DIR}/hadoop-functions.sh." 1>&2
  exit 1
fi

然后执行hadoop-functions.sh的两个方法:

hadoop_find_confdir
hadoop_exec_hadoopenv

2.5 hadoop-functions.sh脚本执行

hadoop_find_confdir方法获取HADOOP_CONF_DIR变量,默认从linux环境变量中获取HADOOP_CONF_DIR值,如果为空则从${HADOOP_HOME}/etc/hadoop中获取:

function hadoop_find_confdir
{
  local conf_dir

  # An attempt at compatibility with some Hadoop 1.x
  # installs.
  if [[ -e "${HADOOP_HOME}/conf/hadoop-env.sh" ]]; then
    conf_dir="conf"
  else
    conf_dir="etc/hadoop"
  fi
  export HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-${HADOOP_HOME}/${conf_dir}}"

  hadoop_debug "HADOOP_CONF_DIR=${HADOOP_CONF_DIR}"
}

hadoop_exec_hadoopenv用于执行hadoop-env.sh脚本,只执行一次:

function hadoop_exec_hadoopenv
{
  if [[ -z "${HADOOP_ENV_PROCESSED}" ]]; then
    if [[ -f "${HADOOP_CONF_DIR}/hadoop-env.sh" ]]; then
      export HADOOP_ENV_PROCESSED=true
      # shellcheck source=./hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh
      . "${HADOOP_CONF_DIR}/hadoop-env.sh"
    fi
  fi
}

2.5 hadoop-env.sh脚本执行

在hadoop-env.sh中,放置了所有hadoop的配置信息,一般在这个文件中配置hadoop系统。例如:

export HADOOP_CONF_DIR=/home/hadoop/conf/hadoop2/cluster1
export HDFS_CONF_DIR=/home/hadoop/conf/hadoop2/cluster1

# The java implementation to use.  Required.

export JAVA_HOME=/home/hadoop/src/jdk-1.8.0

export HADOOP_HOME=/home/hadoop/hadoop2

# The jsvc implementation to use. Jsvc is required to run secure datanodes.
export JSVC_HOME=${HADOOP_HOME}/bin

# Extra Java CLASSPATH elements.  Optional.
# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"

# The maximum amount of heap to use, in MB. Default is 1000.
export HADOOP_HEAPSIZE="${HADOOP_HEAPSIZE:-4096}"

export HADOOP_DATANODE_HEAPSIZE=-Xmx16384m

3. Java解析命令过程

FsShell会启动进程时,执行FsShell.run。该方法先获取所有命令对应的java类,再通过命令匹配到对应处理类,执行处理类。

首先,CommandFactory类维护了classMap,它负责对应具体命令与Java处理类:

public class CommandFactory extends Configured {
  private Map<String, Class<? extends Command>> classMap =
    new HashMap<String, Class<? extends Command>>();

  private Map<String, Command> objectMap =
    new HashMap<String, Command>();
}

在FsShell中,执行执行FsShell.run开始启动处理流程。它的流程分三步:

  1. 初始化获取所有命令对应的Java处理类,写到Map。
  2. 从Map中获取命令对应的java处理类。
  3. 执行java处理类。
public int run(String argv[]) throws Exception {
    // initialize FsShell
    //初始化
    init();
      String cmd = argv[0];
      //获取命令对应的java处理类
      Command instance = commandFactory.getInstance(cmd);
      //执行处理逻辑
      exitCode = instance.run(Arrays.copyOfRange(argv, 1, argv.length));
  }

3.1 初始化构建命令与处理类的Map

init方法中,FsShell.registerCommands通过反射的方式,执行FsCommand.registerCommands方法:

protected void registerCommands(CommandFactory factory) {
    // TODO: DFSAdmin subclasses FsShell so need to protect the command
    // registration.  This class should morph into a base class for
    // commands, and then this method can be abstract
    if (this.getClass().equals(FsShell.class)) {
      factory.registerCommands(FsCommand.class);
    }
  }

public void registerCommands(Class<?> registrarClass) {
    try {
      registrarClass.getMethod(
          "registerCommands", CommandFactory.class
      ).invoke(null, this);
    } catch (Exception e) {
      throw new RuntimeException(StringUtils.stringifyException(e));
    }
  }

FsCommand就负责将所有命令和对应的处理类放到CommandFactory中的classMap:

public static void registerCommands(CommandFactory factory) {
    factory.registerCommands(AclCommands.class);
    factory.registerCommands(CopyCommands.class);
    factory.registerCommands(Count.class);
    factory.registerCommands(Delete.class);
    factory.registerCommands(Display.class);
    factory.registerCommands(Find.class);
    factory.registerCommands(FsShellPermissions.class);
    factory.registerCommands(FsUsage.class);
    factory.registerCommands(Ls.class);
    factory.registerCommands(Mkdir.class);
    factory.registerCommands(MoveCommands.class);
    factory.registerCommands(SetReplication.class);
    factory.registerCommands(Stat.class);
    factory.registerCommands(Tail.class);
    factory.registerCommands(Head.class);
    factory.registerCommands(Test.class);
    factory.registerCommands(TouchCommands.class);
    factory.registerCommands(Truncate.class);
    factory.registerCommands(SnapshotCommands.class);
    factory.registerCommands(XAttrCommands.class);
  }

注意,上述注册过程中,每个java类可能有多个子处理类,需要真正注册子处理类。例如Display类表示数据获取操作,它包含三种命令:cat、text、checksum。需要将这三个命令对应的java类都放到CommandFactory的classMap:

class Display extends FsCommand {
  public static void registerCommands(CommandFactory factory) {
    factory.addClass(Cat.class, "-cat");
    factory.addClass(Text.class, "-text");
    factory.addClass(Checksum.class, "-checksum");
  }
}

3.2 获取实例

根据命令,比如-ls,获取对应的java类Ls.class:

public Command getInstance(String cmdName, Configuration conf) {
    if (conf == null) throw new NullPointerException("configuration is null");
    
    Command instance = objectMap.get(cmdName);
    if (instance == null) {
      Class<? extends Command> cmdClass = classMap.get(cmdName);
      if (cmdClass != null) {
        instance = ReflectionUtils.newInstance(cmdClass, conf);
        instance.setName(cmdName);
        instance.setCommandFactory(this);
      }
    }
    return instance;
  }

3.3 执行命令

Command.run方法最终进入Command.processPathInternal方法,执行processPath方法:

private void processPathInternal(PathData item) throws IOException {
    processPath(item);
    if (recursive && isPathRecursable(item)) {
      recursePath(item);
    }
    postProcessPath(item);
  }

processPath执行的是实现类,例如命令行为-ls时,最终会执行Ls的processPath:

Untitled.png

在Ls.processPath方法中,执行getContentSummary方法获取集群信息。如下,会通过HDFS客户端获取文件状态信息:

public ContentSummary getContentSummary(Path f) throws IOException {
    FileStatus status = getFileStatus(f);
    if (status.isFile()) {
      // f is a file
      long length = status.getLen();
      return new ContentSummary.Builder().length(length).
          fileCount(1).directoryCount(0).spaceConsumed(length).build();
    }
    // f is a directory
    long[] summary = {0, 0, 1};
    for(FileStatus s : listStatus(f)) {
      long length = s.getLen();
      ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
          new ContentSummary.Builder().length(length).
          fileCount(1).directoryCount(0).spaceConsumed(length).build();
      summary[0] += c.getLength();
      summary[1] += c.getFileCount();
      summary[2] += c.getDirectoryCount();
    }
    return new ContentSummary.Builder().length(summary[0]).
        fileCount(summary[1]).directoryCount(summary[2]).
        spaceConsumed(summary[0]).build();
  }

分析完毕。

举报

相关推荐

0 条评论