CliFrontend.java
protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");
//获取默认的运行参数
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
//解析参数,返回commandLine
final CommandLine commandLine = getCommandLine(commandOptions, args, true);
// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}
final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));
final ProgramOptions programOptions = ProgramOptions.create(commandLine);
final List<URL> jobJars = getJobJarAndDependencies(programOptions);
final Configuration effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, jobJars);
LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration);
try {
executeProgram(effectiveConfiguration, program);
} finally {
program.deleteExtractedLibraries();
}
}
public CommandLine getCommandLine(final Options commandOptions, final String[] args, final boolean stopAtNonOptions) throws CliArgsException {
final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
return CliFrontendParser.parse(commandLineOptions, args, stopAtNonOptions);
}
public class CliFrontendParser {
static final Option HELP_OPTION = new Option("h", "help", false,
"Show the help message for the CLI Frontend or the action.");
static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");
static final Option CLASS_OPTION = new Option("c", "class", true,
"Class with the program entry point (\"main()\" method). Only needed if the " +
"JAR file does not specify the class in its manifest.");
static final Option CLASSPATH_OPTION = new Option("C", "classpath", true, "Adds a URL to each user code " +
"classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be " +
"accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple " +
"times for specifying more than one URL. The protocol must be supported by the " +
"{@link java.net.URLClassLoader}.");
public static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
"The parallelism with which to run the program. Optional flag to override the default value " +
"specified in the configuration.");
public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
"the job in detached mode");
public static final Option SHUTDOWN_IF_ATTACHED_OPTION = new Option(
"sae", "shutdownOnAttachedExit", false,
"If the job is submitted in attached mode, perform a best-effort cluster shutdown " +
"when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");
public CommandLine parse(Options options, String[] arguments, Properties properties, boolean stopAtNonOption)
throws ParseException
{
this.options = options;
this.stopAtNonOption = stopAtNonOption;
skipParsing = false;
currentOption = null;
expectedOpts = new ArrayList(options.getRequiredOptions());
// clear the data from the groups
for (OptionGroup group : options.getOptionGroups())
{
group.setSelected(null);
}
cmd = new CommandLine();
if (arguments != null)
{
for (String argument : arguments)
{
handleToken(argument);
}
}
private void handleToken(String token) throws ParseException
{
currentToken = token;
if (skipParsing)
{
cmd.addArg(token);
}
else if ("--".equals(token))
{
skipParsing = true;
}
else if (currentOption != null && currentOption.acceptsArg() && isArgument(token))
{
// 添加参数值
currentOption.addValueForProcessing(Util.stripLeadingAndTrailingQuotes(token));
}
else if (token.startsWith("--"))
{
// 解析 --形式的参数名
handleLongOption(token);
}
else if (token.startsWith("-") && !"-".equals(token))
{
// 解析 -形式的参数名
handleShortAndLongOption(token);
}
else
{
handleUnknownToken(token);
}
if (currentOption != null && !currentOption.acceptsArg())
{
currentOption = null;
}
}
private void handleLongOption(String token) throws ParseException
{
if (token.indexOf('=') == -1)
{
//解析 –L、-L、--l、-l形式的参数(不包含=)
handleLongOptionWithoutEqual(token);
}
else
{
// 解析 --L=V、-L=V、--l=V、-l=V形式的参数(包含=)
handleLongOptionWithEqual(token);
}
}
各种情况的解析,逻辑大体相同:去除-或--前缀,校验参数,以其中一个为例
private void handleLongOptionWithoutEqual(String token) throws ParseException
{
// 校验参数是否合法
List<String> matchingOpts = options.getMatchingOptions(token);
if (matchingOpts.isEmpty())
{
handleUnknownToken(currentToken);
}
else if (matchingOpts.size() > 1)
{
throw new AmbiguousOptionException(token, matchingOpts);
}
else
{
// 参数添加到执行命令
handleOption(options.getOption(matchingOpts.get(0)));
}
}
public List<String> getMatchingOptions(String opt)
{
// 去除 - 或 -- 前缀
opt = Util.stripLeadingHyphens(opt);
List<String> matchingOpts = new ArrayList<String>();
// for a perfect match return the single option only
if (longOpts.keySet().contains(opt))
{
return Collections.singletonList(opt);
}
for (String longOpt : longOpts.keySet())
{
if (longOpt.startsWith(opt))
{
matchingOpts.add(longOpt);
}
}
return matchingOpts;
}
private void handleOption(Option option) throws ParseException
{
// check the previous option before handling the next one
checkRequiredArgs();
option = (Option) option.clone();
updateRequiredOptions(option);
cmd.addOption(option);
if (option.hasArg())
{
currentOption = option;
}
else
{
currentOption = null;
}
}