0
点赞
收藏
分享

微信扫一扫

flink学习解析输入参数

佃成成成成 2022-03-31 阅读 34

CliFrontend.java

protected void run(String[] args) throws Exception {
LOG.info("Running 'run' command.");

//获取默认的运行参数
final Options commandOptions = CliFrontendParser.getRunCommandOptions();
//解析参数,返回commandLine
final CommandLine commandLine = getCommandLine(commandOptions, args, true);

// evaluate help flag
if (commandLine.hasOption(HELP_OPTION.getOpt())) {
CliFrontendParser.printHelpForRun(customCommandLines);
return;
}

final CustomCommandLine activeCommandLine =
validateAndGetActiveCommandLine(checkNotNull(commandLine));

final ProgramOptions programOptions = ProgramOptions.create(commandLine);

final List<URL> jobJars = getJobJarAndDependencies(programOptions);

final Configuration effectiveConfiguration = getEffectiveConfiguration(
activeCommandLine, commandLine, programOptions, jobJars);

LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration);

try {
executeProgram(effectiveConfiguration, program);
} finally {
program.deleteExtractedLibraries();
}
}
public CommandLine getCommandLine(final Options commandOptions, final String[] args, final boolean stopAtNonOptions) throws CliArgsException {
final Options commandLineOptions = CliFrontendParser.mergeOptions(commandOptions, customCommandLineOptions);
return CliFrontendParser.parse(commandLineOptions, args, stopAtNonOptions);
}
public class CliFrontendParser {

static final Option HELP_OPTION = new Option("h", "help", false,
"Show the help message for the CLI Frontend or the action.");

static final Option JAR_OPTION = new Option("j", "jarfile", true, "Flink program JAR file.");

static final Option CLASS_OPTION = new Option("c", "class", true,
"Class with the program entry point (\"main()\" method). Only needed if the " +
"JAR file does not specify the class in its manifest.");

static final Option CLASSPATH_OPTION = new Option("C", "classpath", true, "Adds a URL to each user code " +
"classloader on all nodes in the cluster. The paths must specify a protocol (e.g. file://) and be " +
"accessible on all nodes (e.g. by means of a NFS share). You can use this option multiple " +
"times for specifying more than one URL. The protocol must be supported by the " +
"{@link java.net.URLClassLoader}.");

public static final Option PARALLELISM_OPTION = new Option("p", "parallelism", true,
"The parallelism with which to run the program. Optional flag to override the default value " +
"specified in the configuration.");
public static final Option DETACHED_OPTION = new Option("d", "detached", false, "If present, runs " +
"the job in detached mode");

public static final Option SHUTDOWN_IF_ATTACHED_OPTION = new Option(
"sae", "shutdownOnAttachedExit", false,
"If the job is submitted in attached mode, perform a best-effort cluster shutdown " +
"when the CLI is terminated abruptly, e.g., in response to a user interrupt, such as typing Ctrl + C.");
public CommandLine parse(Options options, String[] arguments, Properties properties, boolean stopAtNonOption)
throws ParseException
{
this.options = options;
this.stopAtNonOption = stopAtNonOption;
skipParsing = false;
currentOption = null;
expectedOpts = new ArrayList(options.getRequiredOptions());

// clear the data from the groups
for (OptionGroup group : options.getOptionGroups())
{
group.setSelected(null);
}

cmd = new CommandLine();

if (arguments != null)
{
for (String argument : arguments)
{
handleToken(argument);
}
}
private void handleToken(String token) throws ParseException
{
currentToken = token;

if (skipParsing)
{
cmd.addArg(token);
}
else if ("--".equals(token))
{
skipParsing = true;
}
else if (currentOption != null && currentOption.acceptsArg() && isArgument(token))
{
// 添加参数值
currentOption.addValueForProcessing(Util.stripLeadingAndTrailingQuotes(token));
}
else if (token.startsWith("--"))
{
// 解析 --形式的参数名
handleLongOption(token);
}
else if (token.startsWith("-") && !"-".equals(token))
{
// 解析 -形式的参数名
handleShortAndLongOption(token);
}
else
{
handleUnknownToken(token);
}

if (currentOption != null && !currentOption.acceptsArg())
{
currentOption = null;
}
}
private void handleLongOption(String token) throws ParseException
{
if (token.indexOf('=') == -1)
{
//解析 –L、-L、--l、-l形式的参数(不包含=)
handleLongOptionWithoutEqual(token);
}
else
{
// 解析 --L=V、-L=V、--l=V、-l=V形式的参数(包含=)
handleLongOptionWithEqual(token);
}
}

各种情况的解析,逻辑大体相同:去除-或--前缀,校验参数,以其中一个为例

private void handleLongOptionWithoutEqual(String token) throws ParseException
{
// 校验参数是否合法
List<String> matchingOpts = options.getMatchingOptions(token);
if (matchingOpts.isEmpty())
{
handleUnknownToken(currentToken);
}
else if (matchingOpts.size() > 1)
{
throw new AmbiguousOptionException(token, matchingOpts);
}
else
{
// 参数添加到执行命令
handleOption(options.getOption(matchingOpts.get(0)));
}
}
public List<String> getMatchingOptions(String opt)
{
// 去除 - 或 -- 前缀
opt = Util.stripLeadingHyphens(opt);

List<String> matchingOpts = new ArrayList<String>();

// for a perfect match return the single option only
if (longOpts.keySet().contains(opt))
{
return Collections.singletonList(opt);
}

for (String longOpt : longOpts.keySet())
{
if (longOpt.startsWith(opt))
{
matchingOpts.add(longOpt);
}
}

return matchingOpts;
}
private void handleOption(Option option) throws ParseException
{
// check the previous option before handling the next one
checkRequiredArgs();

option = (Option) option.clone();

updateRequiredOptions(option);

cmd.addOption(option);

if (option.hasArg())
{
currentOption = option;
}
else
{
currentOption = null;
}
}
举报

相关推荐

0 条评论