1. 背景
在执行HDFS命令时,通常会设置环境变量,在执行具体操作。例如:
export HADOOP_CONF_DIR=/home/hadoop/conf/cluster1
hdfs dfs -ls hdfs://cluster1/data/xxx
在执行脚本时,我往往会有两个疑问:
- hdfs命令如果加载HADOOP_CONF_DIR配置的,加载后发生了什么。
- Java如果解析-ls操作,代码中是如何流转的。
本文将对这两个疑问进行探讨。
2. HDFS环境变量加载流程
2.1 提前总结
- 执行hdfs命令先加载配置,然后启动java虚拟机。
- hdfs命令中执行hdfs-config.sh脚本,该脚本负责执行hadoop-env.sh脚本。
- hadoop_config.sh脚本先执行hadoop-functions.sh脚本,然后调用该脚本中的方法。
- 先后执行hadoop-functions.sh脚本中的hadoop_find_confdir和hadoop_exec_hadoopenv方法,从linux环境变量中获取HADOOP_CONF_DIR,在HADOOP_CONF_DIR路径中执行haodop-env.sh。
- haodop-env.sh用于运维人员设置所有真实生效的配置。
2.2 hdfs命令执行脚本
首先,hdfs获取HADOOP_HOME所在目录,获取目录中libexec的目录路径,执行libexec目录中hdfs-config.sh脚本,开始加载配置:
# let's locate libexec...
if [[ -n "${HADOOP_HOME}" ]]; then
HADOOP_DEFAULT_LIBEXEC_DIR="${HADOOP_HOME}/libexec"
else
bin=$(cd -P -- "$(dirname -- "${MYNAME}")" >/dev/null && pwd -P)
HADOOP_DEFAULT_LIBEXEC_DIR="${bin}/../libexec"
fi
HADOOP_LIBEXEC_DIR="${HADOOP_LIBEXEC_DIR:-$HADOOP_DEFAULT_LIBEXEC_DIR}"
HADOOP_NEW_CONFIG=true
if [[ -f "${HADOOP_LIBEXEC_DIR}/hdfs-config.sh" ]]; then
# shellcheck source=./hadoop-hdfs-project/hadoop-hdfs/src/main/bin/hdfs-config.sh
. "${HADOOP_LIBEXEC_DIR}/hdfs-config.sh"
else
echo "ERROR: Cannot execute ${HADOOP_LIBEXEC_DIR}/hdfs-config.sh." 2>&1
exit 1
fi
加载完配置后,开始启动java虚拟机,它执行hdfscmd_case方法,将解析到的配置传递进来:
hdfscmd_case "${HADOOP_SUBCMD}" "${HADOOP_SUBCMD_ARGS[@]}"
hdfscmd_case方法根据命令匹配对应的Java类,例如hdfs dfs命令最终执行的是org.apache.hadoop.fs.FsShell类:
## @param CLI arguments
function hdfscmd_case
{
subcmd=$1
shift
case ${subcmd} in
dfs)
HADOOP_CLASSNAME=org.apache.hadoop.fs.FsShell
;;
//省略
}
本小节只研究配置加载过程,FsShell解析流程待下一小节解析。
2.3 hdfs-config.sh脚本执行
它只负责执行hadoop-config.sh脚本:
if [[ -n "${HADOOP_COMMON_HOME}" ]] &&
[[ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh" ]]; then
. "${HADOOP_COMMON_HOME}/libexec/hadoop-config.sh"
elif [[ -e "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh" ]]; then
. "${HADOOP_LIBEXEC_DIR}/hadoop-config.sh"
elif [ -e "${HADOOP_HOME}/libexec/hadoop-config.sh" ]; then
. "${HADOOP_HOME}/libexec/hadoop-config.sh"
else
echo "ERROR: Hadoop common not found." 2>&1
exit 1
2.4 hadoop_config.sh脚本执行
加载hadoop-functions.sh脚本,获取所有方法:
# get our functions defined for usage later
if [[ -n "${HADOOP_COMMON_HOME}" ]] &&
[[ -e "${HADOOP_COMMON_HOME}/libexec/hadoop-functions.sh" ]]; then
# shellcheck source=./hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
. "${HADOOP_COMMON_HOME}/libexec/hadoop-functions.sh"
elif [[ -e "${HADOOP_LIBEXEC_DIR}/hadoop-functions.sh" ]]; then
# shellcheck source=./hadoop-common-project/hadoop-common/src/main/bin/hadoop-functions.sh
. "${HADOOP_LIBEXEC_DIR}/hadoop-functions.sh"
else
echo "ERROR: Unable to exec ${HADOOP_LIBEXEC_DIR}/hadoop-functions.sh." 1>&2
exit 1
fi
然后执行hadoop-functions.sh的两个方法:
hadoop_find_confdir
hadoop_exec_hadoopenv
2.5 hadoop-functions.sh脚本执行
hadoop_find_confdir方法获取HADOOP_CONF_DIR变量,默认从linux环境变量中获取HADOOP_CONF_DIR值,如果为空则从${HADOOP_HOME}/etc/hadoop中获取:
function hadoop_find_confdir
{
local conf_dir
# An attempt at compatibility with some Hadoop 1.x
# installs.
if [[ -e "${HADOOP_HOME}/conf/hadoop-env.sh" ]]; then
conf_dir="conf"
else
conf_dir="etc/hadoop"
fi
export HADOOP_CONF_DIR="${HADOOP_CONF_DIR:-${HADOOP_HOME}/${conf_dir}}"
hadoop_debug "HADOOP_CONF_DIR=${HADOOP_CONF_DIR}"
}
hadoop_exec_hadoopenv用于执行hadoop-env.sh脚本,只执行一次:
function hadoop_exec_hadoopenv
{
if [[ -z "${HADOOP_ENV_PROCESSED}" ]]; then
if [[ -f "${HADOOP_CONF_DIR}/hadoop-env.sh" ]]; then
export HADOOP_ENV_PROCESSED=true
# shellcheck source=./hadoop-common-project/hadoop-common/src/main/conf/hadoop-env.sh
. "${HADOOP_CONF_DIR}/hadoop-env.sh"
fi
fi
}
2.5 hadoop-env.sh脚本执行
在hadoop-env.sh中,放置了所有hadoop的配置信息,一般在这个文件中配置hadoop系统。例如:
export HADOOP_CONF_DIR=/home/hadoop/conf/hadoop2/cluster1
export HDFS_CONF_DIR=/home/hadoop/conf/hadoop2/cluster1
# The java implementation to use. Required.
export JAVA_HOME=/home/hadoop/src/jdk-1.8.0
export HADOOP_HOME=/home/hadoop/hadoop2
# The jsvc implementation to use. Jsvc is required to run secure datanodes.
export JSVC_HOME=${HADOOP_HOME}/bin
# Extra Java CLASSPATH elements. Optional.
# export HADOOP_CLASSPATH="<extra_entries>:$HADOOP_CLASSPATH"
# The maximum amount of heap to use, in MB. Default is 1000.
export HADOOP_HEAPSIZE="${HADOOP_HEAPSIZE:-4096}"
export HADOOP_DATANODE_HEAPSIZE=-Xmx16384m
3. Java解析命令过程
FsShell会启动进程时,执行FsShell.run。该方法先获取所有命令对应的java类,再通过命令匹配到对应处理类,执行处理类。
首先,CommandFactory类维护了classMap,它负责对应具体命令与Java处理类:
public class CommandFactory extends Configured {
private Map<String, Class<? extends Command>> classMap =
new HashMap<String, Class<? extends Command>>();
private Map<String, Command> objectMap =
new HashMap<String, Command>();
}
在FsShell中,执行执行FsShell.run开始启动处理流程。它的流程分三步:
- 初始化获取所有命令对应的Java处理类,写到Map。
- 从Map中获取命令对应的java处理类。
- 执行java处理类。
public int run(String argv[]) throws Exception {
// initialize FsShell
//初始化
init();
String cmd = argv[0];
//获取命令对应的java处理类
Command instance = commandFactory.getInstance(cmd);
//执行处理逻辑
exitCode = instance.run(Arrays.copyOfRange(argv, 1, argv.length));
}
3.1 初始化构建命令与处理类的Map
init方法中,FsShell.registerCommands通过反射的方式,执行FsCommand.registerCommands方法:
protected void registerCommands(CommandFactory factory) {
// TODO: DFSAdmin subclasses FsShell so need to protect the command
// registration. This class should morph into a base class for
// commands, and then this method can be abstract
if (this.getClass().equals(FsShell.class)) {
factory.registerCommands(FsCommand.class);
}
}
public void registerCommands(Class<?> registrarClass) {
try {
registrarClass.getMethod(
"registerCommands", CommandFactory.class
).invoke(null, this);
} catch (Exception e) {
throw new RuntimeException(StringUtils.stringifyException(e));
}
}
FsCommand就负责将所有命令和对应的处理类放到CommandFactory中的classMap:
public static void registerCommands(CommandFactory factory) {
factory.registerCommands(AclCommands.class);
factory.registerCommands(CopyCommands.class);
factory.registerCommands(Count.class);
factory.registerCommands(Delete.class);
factory.registerCommands(Display.class);
factory.registerCommands(Find.class);
factory.registerCommands(FsShellPermissions.class);
factory.registerCommands(FsUsage.class);
factory.registerCommands(Ls.class);
factory.registerCommands(Mkdir.class);
factory.registerCommands(MoveCommands.class);
factory.registerCommands(SetReplication.class);
factory.registerCommands(Stat.class);
factory.registerCommands(Tail.class);
factory.registerCommands(Head.class);
factory.registerCommands(Test.class);
factory.registerCommands(TouchCommands.class);
factory.registerCommands(Truncate.class);
factory.registerCommands(SnapshotCommands.class);
factory.registerCommands(XAttrCommands.class);
}
注意,上述注册过程中,每个java类可能有多个子处理类,需要真正注册子处理类。例如Display类表示数据获取操作,它包含三种命令:cat、text、checksum。需要将这三个命令对应的java类都放到CommandFactory的classMap:
class Display extends FsCommand {
public static void registerCommands(CommandFactory factory) {
factory.addClass(Cat.class, "-cat");
factory.addClass(Text.class, "-text");
factory.addClass(Checksum.class, "-checksum");
}
}
3.2 获取实例
根据命令,比如-ls,获取对应的java类Ls.class:
public Command getInstance(String cmdName, Configuration conf) {
if (conf == null) throw new NullPointerException("configuration is null");
Command instance = objectMap.get(cmdName);
if (instance == null) {
Class<? extends Command> cmdClass = classMap.get(cmdName);
if (cmdClass != null) {
instance = ReflectionUtils.newInstance(cmdClass, conf);
instance.setName(cmdName);
instance.setCommandFactory(this);
}
}
return instance;
}
3.3 执行命令
Command.run方法最终进入Command.processPathInternal方法,执行processPath方法:
private void processPathInternal(PathData item) throws IOException {
processPath(item);
if (recursive && isPathRecursable(item)) {
recursePath(item);
}
postProcessPath(item);
}
processPath执行的是实现类,例如命令行为-ls时,最终会执行Ls的processPath:
在Ls.processPath方法中,执行getContentSummary方法获取集群信息。如下,会通过HDFS客户端获取文件状态信息:
public ContentSummary getContentSummary(Path f) throws IOException {
FileStatus status = getFileStatus(f);
if (status.isFile()) {
// f is a file
long length = status.getLen();
return new ContentSummary.Builder().length(length).
fileCount(1).directoryCount(0).spaceConsumed(length).build();
}
// f is a directory
long[] summary = {0, 0, 1};
for(FileStatus s : listStatus(f)) {
long length = s.getLen();
ContentSummary c = s.isDirectory() ? getContentSummary(s.getPath()) :
new ContentSummary.Builder().length(length).
fileCount(1).directoryCount(0).spaceConsumed(length).build();
summary[0] += c.getLength();
summary[1] += c.getFileCount();
summary[2] += c.getDirectoryCount();
}
return new ContentSummary.Builder().length(summary[0]).
fileCount(summary[1]).directoryCount(summary[2]).
spaceConsumed(summary[0]).build();
}
分析完毕。