- 前文提及进入processCmd真正执行单条sql语句
- 进入该方法中查看相关的执行逻辑
2.1 刷新打印流,防止上一个命令输出
2.2 将传入的单句sql去除注释
2.3 按照空格、制表符切割,存入token数组中
2.4 判断切割出来的第一个字段是什么来决定相应的处理方式
2.4.1 如果输入的sql是quit,则退出
2.4.2 如果是source,则是执行跟在后面的文件,最后也是交给processFile去执行
2.4.3 如果是shell脚本,则调用ShellCmdExecutor
2.4.4 其他情况则是sql,通过processLocalCmd去执行
public int processCmd(String cmd) {
CliSessionState ss = (CliSessionState) SessionState.get();
ss.setLastCommand(cmd);
ss.updateThreadName();
ss.err.flush();
String cmd_trimmed = HiveStringUtils.removeComments(cmd).trim();
String[] tokens = tokenizeCmd(cmd_trimmed);
int ret = 0;
if (cmd_trimmed.toLowerCase().equals("quit") || cmd_trimmed.toLowerCase().equals("exit")) {
ss.close();
System.exit(0);
} else if (tokens[0].equalsIgnoreCase("source")) {
String cmd_1 = getFirstCmd(cmd_trimmed, tokens[0].length());
cmd_1 = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(ss.getConf(), cmd_1);
File sourceFile = new File(cmd_1);
if (! sourceFile.isFile()){
console.printError("File: "+ cmd_1 + " is not a file.");
ret = 1;
} else {
try {
ret = processFile(cmd_1);
} catch (IOException e) {
console.printError("Failed processing file "+ cmd_1 +" "+ e.getLocalizedMessage(),
stringifyException(e));
ret = 1;
}
}
} else if (cmd_trimmed.startsWith("!")) {
String shell_cmd = cmd.trim().substring(1);
shell_cmd = new VariableSubstitution(new HiveVariableSource() {
@Override
public Map<String, String> getHiveVariable() {
return SessionState.get().getHiveVariables();
}
}).substitute(ss.getConf(), shell_cmd);
try {
ShellCmdExecutor executor = new ShellCmdExecutor(shell_cmd, ss.out, ss.err);
ret = executor.execute();
if (ret != 0) {
console.printError("Command failed with exit code = " + ret);
}
} catch (Exception e) {
console.printError("Exception raised from Shell command " + e.getLocalizedMessage(),
stringifyException(e));
ret = 1;
}
} else {
try {
try (CommandProcessor proc = CommandProcessorFactory.get(tokens, (HiveConf) conf)) {
if (proc instanceof IDriver) {
ret = processLocalCmd(cmd, proc, ss);
} else {
ret = processLocalCmd(cmd_trimmed, proc, ss);
}
}
} catch (SQLException e) {
console.printError("Failed processing command " + tokens[0] + " " + e.getLocalizedMessage(),
org.apache.hadoop.util.StringUtils.stringifyException(e));
ret = 1;
}
catch (Exception e) {
throw new RuntimeException(e);
}
}
ss.resetThreadName();
return ret;
}