0
点赞
收藏
分享

微信扫一扫

Flink源码系列(启动ApplicationMaster以及YarnJobClusterEntrypoint进程启动)-第五期

上一期指路

第四期

上一期已经分析了作业图生成,这一期主要分析上传jar包和配置、封装参数以及启动命令并把应用提交给yarn集群中的rm,然后会在nm上启动ApplicationMaster以及YarnJobClusterEntrypoint。

从上一期分析的这个图开始,生成作业图已经分析完了,接下来主要看部署集群

 1.YarnClusterDescriptor#deployJobCluster

	public ClusterClientProvider<ApplicationId> deployJobCluster(
		ClusterSpecification clusterSpecification,
		JobGraph jobGraph,
		boolean detached) throws ClusterDeploymentException {
		try {
			return deployInternal(
				clusterSpecification,
				"Flink per-job cluster",
				getYarnJobClusterEntrypoint(),
				jobGraph,
				detached);
		} catch (Exception e) {
			throw new ClusterDeploymentException("Could not deploy Yarn job cluster.", e);
		}
	}

2.YarnClusterDescriptor#deployInternal

	private ClusterClientProvider<ApplicationId> deployInternal(
			ClusterSpecification clusterSpecification,
			String applicationName,
			String yarnClusterEntrypoint,
			@Nullable JobGraph jobGraph,
			boolean detached) throws Exception {

		final UserGroupInformation currentUser = UserGroupInformation.getCurrentUser();
		if (HadoopUtils.isKerberosSecurityEnabled(currentUser)) {
			boolean useTicketCache = flinkConfiguration.getBoolean(SecurityOptions.KERBEROS_LOGIN_USETICKETCACHE);

			if (!HadoopUtils.areKerberosCredentialsValid(currentUser, useTicketCache)) {
				throw new RuntimeException("Hadoop security with Kerberos is enabled but the login user " +
					"does not have Kerberos credentials or delegation tokens!");
			}
		}

		isReadyForDeployment(clusterSpecification);

		// ------------------ Check if the specified queue exists --------------------

		checkYarnQueues(yarnClient);

		// ------------------ Check if the YARN ClusterClient has the requested resources --------------

		// Create application via yarnClient
		final YarnClientApplication yarnApplication = yarnClient.createApplication();
		final GetNewApplicationResponse appResponse = yarnApplication.getNewApplicationResponse();

		Resource maxRes = appResponse.getMaximumResourceCapability();

		final ClusterResourceDescription freeClusterMem;
		try {
			freeClusterMem = getCurrentFreeClusterResources(yarnClient);
		} catch (YarnException | IOException e) {
			failSessionDuringDeployment(yarnClient, yarnApplication);
			throw new YarnDeploymentException("Could not retrieve information about free cluster resources.", e);
		}

		final int yarnMinAllocationMB = yarnConfiguration.getInt(
				YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
				YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
		if (yarnMinAllocationMB <= 0) {
			throw new YarnDeploymentException("The minimum allocation memory "
					+ "(" + yarnMinAllocationMB + " MB) configured via '" + YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB
					+ "' should be greater than 0.");
		}

		final ClusterSpecification validClusterSpecification;
		try {
			validClusterSpecification = validateClusterResources(
					clusterSpecification,
					yarnMinAllocationMB,
					maxRes,
					freeClusterMem);
		} catch (YarnDeploymentException yde) {
			failSessionDuringDeployment(yarnClient, yarnApplication);
			throw yde;
		}

		LOG.info("Cluster specification: {}", validClusterSpecification);

		final ClusterEntrypoint.ExecutionMode executionMode = detached ?
				ClusterEntrypoint.ExecutionMode.DETACHED
				: ClusterEntrypoint.ExecutionMode.NORMAL;

		flinkConfiguration.setString(ClusterEntrypoint.EXECUTION_MODE, executionMode.toString());

		ApplicationReport report = startAppMaster(
				flinkConfiguration,
				applicationName,
				yarnClusterEntrypoint,
				jobGraph,
				yarnClient,
				yarnApplication,
				validClusterSpecification);

		// print the application id for user to cancel themselves.
		if (detached) {
			final ApplicationId yarnApplicationId = report.getApplicationId();
			logDetachedClusterInformation(yarnApplicationId, LOG);
		}

		setClusterEntrypointInfoToConfig(report);

		return () -> {
			try {
				return new RestClusterClient<>(flinkConfiguration, report.getApplicationId());
			} catch (Exception e) {
				throw new RuntimeException("Error while creating RestClusterClient.", e);
			}
		};
	}

 其中对于响应信息的格式如下:

我们详细去看

startAppMaster这一步

3.YarnClusterDescriptor#startAppMaster

	private ApplicationReport startAppMaster(
			Configuration configuration,
			String applicationName,
			String yarnClusterEntrypoint,
			JobGraph jobGraph,
			YarnClient yarnClient,
			YarnClientApplication yarnApplication,
			ClusterSpecification clusterSpecification) throws Exception {

		// ------------------ Initialize the file systems -------------------------

		org.apache.flink.core.fs.FileSystem.initialize(
				configuration,
				PluginUtils.createPluginManagerFromRootFolder(configuration));

		final FileSystem fs = FileSystem.get(yarnConfiguration);

		// hard coded check for the GoogleHDFS client because its not overriding the getScheme() method.
		if (!fs.getClass().getSimpleName().equals("GoogleHadoopFileSystem") &&
				fs.getScheme().startsWith("file")) {
			LOG.warn("The file system scheme is '" + fs.getScheme() + "'. This indicates that the "
					+ "specified Hadoop configuration path is wrong and the system is using the default Hadoop configuration values."
					+ "The Flink YARN client needs to store its files in a distributed file system");
		}

		ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();

		final List<Path> providedLibDirs = Utils.getQualifiedRemoteSharedPaths(configuration, yarnConfiguration);

		final YarnApplicationFileUploader fileUploader = YarnApplicationFileUploader.from(
			fs,
			getStagingDir(fs),
			providedLibDirs,
			appContext.getApplicationId(),
			getFileReplication());

		// The files need to be shipped and added to classpath.
		Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
		for (File file : shipFiles) {
			systemShipFiles.add(file.getAbsoluteFile());
		}

		final String logConfigFilePath = configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
		if (logConfigFilePath != null) {
			systemShipFiles.add(new File(logConfigFilePath));
		}

		// Set-up ApplicationSubmissionContext for the application

		final ApplicationId appId = appContext.getApplicationId();

		// ------------------ Add Zookeeper namespace to local flinkConfiguraton ------
		String zkNamespace = getZookeeperNamespace();
		// no user specified cli argument for namespace?
		if (zkNamespace == null || zkNamespace.isEmpty()) {
			// namespace defined in config? else use applicationId as default.
			zkNamespace = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
			setZookeeperNamespace(zkNamespace);
		}

		configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);

		if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
			// activate re-execution of failed applications
			appContext.setMaxAppAttempts(
					configuration.getInteger(
							YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
							YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));

			activateHighAvailabilitySupport(appContext);
		} else {
			// set number of application retries to 1 in the default case
			appContext.setMaxAppAttempts(
					configuration.getInteger(
							YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
							1));
		}

		final Set<Path> userJarFiles = new HashSet<>();
		if (jobGraph != null) {
			userJarFiles.addAll(jobGraph.getUserJars().stream().map(f -> f.toUri()).map(Path::new).collect(Collectors.toSet()));
		}

		final List<URI> jarUrls = ConfigUtils.decodeListFromConfig(configuration, PipelineOptions.JARS, URI::create);
		if (jarUrls != null && YarnApplicationClusterEntryPoint.class.getName().equals(yarnClusterEntrypoint)) {
			userJarFiles.addAll(jarUrls.stream().map(Path::new).collect(Collectors.toSet()));
		}

		// only for per job mode
		if (jobGraph != null) {
			for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : jobGraph.getUserArtifacts().entrySet()) {
				// only upload local files
				if (!Utils.isRemotePath(entry.getValue().filePath)) {
					Path localPath = new Path(entry.getValue().filePath);
					Tuple2<Path, Long> remoteFileInfo =
							fileUploader.uploadLocalFileToRemote(localPath, entry.getKey());
					jobGraph.setUserArtifactRemotePath(entry.getKey(), remoteFileInfo.f0.toString());
				}
			}

			jobGraph.writeUserArtifactEntriesToConfiguration();
		}

		if (providedLibDirs == null || providedLibDirs.isEmpty()) {
			addLibFoldersToShipFiles(systemShipFiles);
		}

		// Register all files in provided lib dirs as local resources with public visibility
		// and upload the remaining dependencies as local resources with APPLICATION visibility.
		final List<String> systemClassPaths = fileUploader.registerProvidedLocalResources();
		final List<String> uploadedDependencies = fileUploader.registerMultipleLocalResources(
			systemShipFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
			Path.CUR_DIR,
			LocalResourceType.FILE);
		systemClassPaths.addAll(uploadedDependencies);

		// upload and register ship-only files
		// Plugin files only need to be shipped and should not be added to classpath.
		if (providedLibDirs == null || providedLibDirs.isEmpty()) {
			Set<File> shipOnlyFiles = new HashSet<>();
			addPluginsFoldersToShipFiles(shipOnlyFiles);
			fileUploader.registerMultipleLocalResources(
					shipOnlyFiles.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
					Path.CUR_DIR,
					LocalResourceType.FILE);
		}

		if (!shipArchives.isEmpty()) {
			fileUploader.registerMultipleLocalResources(
				shipArchives.stream().map(e -> new Path(e.toURI())).collect(Collectors.toSet()),
				Path.CUR_DIR,
				LocalResourceType.ARCHIVE);
		}

		// Upload and register user jars
		final List<String> userClassPaths = fileUploader.registerMultipleLocalResources(
			userJarFiles,
			userJarInclusion == YarnConfigOptions.UserJarInclusion.DISABLED
					? ConfigConstants.DEFAULT_FLINK_USR_LIB_DIR
					: Path.CUR_DIR,
			LocalResourceType.FILE);

		if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
			systemClassPaths.addAll(userClassPaths);
		}

		// normalize classpath by sorting
		Collections.sort(systemClassPaths);
		Collections.sort(userClassPaths);

		// classpath assembler
		StringBuilder classPathBuilder = new StringBuilder();
		if (userJarInclusion == YarnConfigOptions.UserJarInclusion.FIRST) {
			for (String userClassPath : userClassPaths) {
				classPathBuilder.append(userClassPath).append(File.pathSeparator);
			}
		}
		for (String classPath : systemClassPaths) {
			classPathBuilder.append(classPath).append(File.pathSeparator);
		}

		// Setup jar for ApplicationMaster
		final YarnLocalResourceDescriptor localResourceDescFlinkJar = fileUploader.uploadFlinkDist(flinkJarPath);
		classPathBuilder.append(localResourceDescFlinkJar.getResourceKey()).append(File.pathSeparator);

		// write job graph to tmp file and add it to local resource
		// TODO: server use user main method to generate job graph
		if (jobGraph != null) {
			File tmpJobGraphFile = null;
			try {
				tmpJobGraphFile = File.createTempFile(appId.toString(), null);
				try (FileOutputStream output = new FileOutputStream(tmpJobGraphFile);
					ObjectOutputStream obOutput = new ObjectOutputStream(output)) {
					obOutput.writeObject(jobGraph);
				}

				final String jobGraphFilename = "job.graph";
				configuration.setString(JOB_GRAPH_FILE_PATH, jobGraphFilename);

				fileUploader.registerSingleLocalResource(
					jobGraphFilename,
					new Path(tmpJobGraphFile.toURI()),
					"",
					LocalResourceType.FILE,
					true,
					false);
				classPathBuilder.append(jobGraphFilename).append(File.pathSeparator);
			} catch (Exception e) {
				LOG.warn("Add job graph to local resource fail.");
				throw e;
			} finally {
				if (tmpJobGraphFile != null && !tmpJobGraphFile.delete()) {
					LOG.warn("Fail to delete temporary file {}.", tmpJobGraphFile.toPath());
				}
			}
		}

		// Upload the flink configuration
		// write out configuration file
		File tmpConfigurationFile = null;
		try {
			tmpConfigurationFile = File.createTempFile(appId + "-flink-conf.yaml", null);
			BootstrapTools.writeConfiguration(configuration, tmpConfigurationFile);

			String flinkConfigKey = "flink-conf.yaml";
			fileUploader.registerSingleLocalResource(
				flinkConfigKey,
				new Path(tmpConfigurationFile.getAbsolutePath()),
				"",
				LocalResourceType.FILE,
				true,
				true);
			classPathBuilder.append("flink-conf.yaml").append(File.pathSeparator);
		} finally {
			if (tmpConfigurationFile != null && !tmpConfigurationFile.delete()) {
				LOG.warn("Fail to delete temporary file {}.", tmpConfigurationFile.toPath());
			}
		}

		if (userJarInclusion == YarnConfigOptions.UserJarInclusion.LAST) {
			for (String userClassPath : userClassPaths) {
				classPathBuilder.append(userClassPath).append(File.pathSeparator);
			}
		}

		//To support Yarn Secure Integration Test Scenario
		//In Integration test setup, the Yarn containers created by YarnMiniCluster does not have the Yarn site XML
		//and KRB5 configuration files. We are adding these files as container local resources for the container
		//applications (JM/TMs) to have proper secure cluster setup
		Path remoteYarnSiteXmlPath = null;
		if (System.getenv("IN_TESTS") != null) {
			File f = new File(System.getenv("YARN_CONF_DIR"), Utils.YARN_SITE_FILE_NAME);
			LOG.info("Adding Yarn configuration {} to the AM container local resource bucket", f.getAbsolutePath());
			Path yarnSitePath = new Path(f.getAbsolutePath());
			remoteYarnSiteXmlPath = fileUploader.registerSingleLocalResource(
				Utils.YARN_SITE_FILE_NAME,
				yarnSitePath,
				"",
				LocalResourceType.FILE,
				false,
				false).getPath();
			if (System.getProperty("java.security.krb5.conf") != null) {
				configuration.set(SecurityOptions.KERBEROS_KRB5_PATH, System.getProperty("java.security.krb5.conf"));
			}
		}

		Path remoteKrb5Path = null;
		boolean hasKrb5 = false;
		String krb5Config = configuration.get(SecurityOptions.KERBEROS_KRB5_PATH);
		if (!StringUtils.isNullOrWhitespaceOnly(krb5Config)) {
			final File krb5 = new File(krb5Config);
			LOG.info("Adding KRB5 configuration {} to the AM container local resource bucket", krb5.getAbsolutePath());
			final Path krb5ConfPath = new Path(krb5.getAbsolutePath());
			remoteKrb5Path = fileUploader.registerSingleLocalResource(
					Utils.KRB5_FILE_NAME,
					krb5ConfPath,
					"",
					LocalResourceType.FILE,
					false,
					false).getPath();
			hasKrb5 = true;
		}

		Path remotePathKeytab = null;
		String localizedKeytabPath = null;
		String keytab = configuration.getString(SecurityOptions.KERBEROS_LOGIN_KEYTAB);
		if (keytab != null) {
			boolean	localizeKeytab = flinkConfiguration.getBoolean(YarnConfigOptions.SHIP_LOCAL_KEYTAB);
			localizedKeytabPath = flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
			if (localizeKeytab) {
				// Localize the keytab to YARN containers via local resource.
				LOG.info("Adding keytab {} to the AM container local resource bucket", keytab);
				remotePathKeytab = fileUploader.registerSingleLocalResource(
					localizedKeytabPath,
					new Path(keytab),
					"",
					LocalResourceType.FILE,
					false,
					false).getPath();
			} else {
				// // Assume Keytab is pre-installed in the container.
				localizedKeytabPath = flinkConfiguration.getString(YarnConfigOptions.LOCALIZED_KEYTAB_PATH);
			}
		}

		final JobManagerProcessSpec processSpec = JobManagerProcessUtils.processSpecFromConfigWithNewOptionToInterpretLegacyHeap(
			flinkConfiguration,
			JobManagerOptions.TOTAL_PROCESS_MEMORY);
		final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
				yarnClusterEntrypoint,
				hasKrb5,
				processSpec);

		// setup security tokens
		if (UserGroupInformation.isSecurityEnabled()) {
			// set HDFS delegation tokens when security is enabled
			LOG.info("Adding delegation token to the AM container.");
			List<Path> yarnAccessList = ConfigUtils.decodeListFromConfig(configuration, YarnConfigOptions.YARN_ACCESS, Path::new);
			Utils.setTokensFor(amContainer,
				ListUtils.union(yarnAccessList, fileUploader.getRemotePaths()), yarnConfiguration
			);
		}

		amContainer.setLocalResources(fileUploader.getRegisteredLocalResources());
		fileUploader.close();

		// Setup CLASSPATH and environment variables for ApplicationMaster
		final Map<String, String> appMasterEnv = new HashMap<>();
		// set user specified app master environment variables
		appMasterEnv.putAll(
			ConfigurationUtils.getPrefixedKeyValuePairs(ResourceManagerOptions.CONTAINERIZED_MASTER_ENV_PREFIX, configuration));
		// set Flink app class path
		appMasterEnv.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, classPathBuilder.toString());

		// set Flink on YARN internal configuration values
		appMasterEnv.put(YarnConfigKeys.FLINK_DIST_JAR, localResourceDescFlinkJar.toString());
		appMasterEnv.put(YarnConfigKeys.ENV_APP_ID, appId.toString());
		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, fileUploader.getHomeDir().toString());
		appMasterEnv.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, encodeYarnLocalResourceDescriptorListToString(fileUploader.getEnvShipResourceList()));
		appMasterEnv.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, getZookeeperNamespace());
		appMasterEnv.put(YarnConfigKeys.FLINK_YARN_FILES, fileUploader.getApplicationDir().toUri().toString());

		// https://github.com/apache/hadoop/blob/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/YarnApplicationSecurity.md#identity-on-an-insecure-cluster-hadoop_user_name
		appMasterEnv.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, UserGroupInformation.getCurrentUser().getUserName());

		if (localizedKeytabPath != null) {
			appMasterEnv.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, localizedKeytabPath);
			String principal = configuration.getString(SecurityOptions.KERBEROS_LOGIN_PRINCIPAL);
			appMasterEnv.put(YarnConfigKeys.KEYTAB_PRINCIPAL, principal);
			if (remotePathKeytab != null) {
				appMasterEnv.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, remotePathKeytab.toString());
			}
		}

		//To support Yarn Secure Integration Test Scenario
		if (remoteYarnSiteXmlPath != null) {
			appMasterEnv.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, remoteYarnSiteXmlPath.toString());
		}
		if (remoteKrb5Path != null) {
			appMasterEnv.put(YarnConfigKeys.ENV_KRB5_PATH, remoteKrb5Path.toString());
		}

		// set classpath from YARN configuration
		Utils.setupYarnClassPath(yarnConfiguration, appMasterEnv);

		amContainer.setEnvironment(appMasterEnv);

		// Set up resource type requirements for ApplicationMaster
		Resource capability = Records.newRecord(Resource.class);
		capability.setMemory(clusterSpecification.getMasterMemoryMB());
		capability.setVirtualCores(flinkConfiguration.getInteger(YarnConfigOptions.APP_MASTER_VCORES));

		final String customApplicationName = customName != null ? customName : applicationName;

		appContext.setApplicationName(customApplicationName);
		appContext.setApplicationType(applicationType != null ? applicationType : "Apache Flink");
		appContext.setAMContainerSpec(amContainer);
		appContext.setResource(capability);

		// Set priority for application
		int priorityNum = flinkConfiguration.getInteger(YarnConfigOptions.APPLICATION_PRIORITY);
		if (priorityNum >= 0) {
			Priority priority = Priority.newInstance(priorityNum);
			appContext.setPriority(priority);
		}

		if (yarnQueue != null) {
			appContext.setQueue(yarnQueue);
		}

		setApplicationNodeLabel(appContext);

		setApplicationTags(appContext);

		// add a hook to clean up in case deployment fails
		Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, fileUploader.getApplicationDir());
		Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
		LOG.info("Submitting application master " + appId);
		yarnClient.submitApplication(appContext);

		LOG.info("Waiting for the cluster to be allocated");
		final long startTime = System.currentTimeMillis();
		ApplicationReport report;
		YarnApplicationState lastAppState = YarnApplicationState.NEW;
		loop: while (true) {
			try {
				report = yarnClient.getApplicationReport(appId);
			} catch (IOException e) {
				throw new YarnDeploymentException("Failed to deploy the cluster.", e);
			}
			YarnApplicationState appState = report.getYarnApplicationState();
			LOG.debug("Application State: {}", appState);
			switch(appState) {
				case FAILED:
				case KILLED:
					throw new YarnDeploymentException("The YARN application unexpectedly switched to state "
							+ appState + " during deployment. \n" +
							"Diagnostics from YARN: " + report.getDiagnostics() + "\n" +
							"If log aggregation is enabled on your cluster, use this command to further investigate the issue:\n" +
							"yarn logs -applicationId " + appId);
					//break ..
				case RUNNING:
					LOG.info("YARN application has been deployed successfully.");
					break loop;
				case FINISHED:
					LOG.info("YARN application has been finished successfully.");
					break loop;
				default:
					if (appState != lastAppState) {
						LOG.info("Deploying cluster, current state " + appState);
					}
					if (System.currentTimeMillis() - startTime > 60000) {
						LOG.info("Deployment took more than 60 seconds. Please check if the requested resources are available in the YARN cluster");
					}

			}
			lastAppState = appState;
			Thread.sleep(250);
		}

		// since deployment was successful, remove the hook
		ShutdownHookUtil.removeShutdownHook(deploymentFailureHook, getClass().getSimpleName(), LOG);
		return report;
	}

我们需要在详细分析一下④和⑧,先看④,点击进入

4.YarnClusterDescriptor#setupApplicationMasterContainer

	ContainerLaunchContext setupApplicationMasterContainer(
			String yarnClusterEntrypoint,
			boolean hasKrb5,
			JobManagerProcessSpec processSpec) {
		// ------------------ Prepare Application Master Container  ------------------------------

		// respect custom JVM options in the YAML file
		String javaOpts = flinkConfiguration.getString(CoreOptions.FLINK_JVM_OPTIONS);
		if (flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS).length() > 0) {
			javaOpts += " " + flinkConfiguration.getString(CoreOptions.FLINK_JM_JVM_OPTIONS);
		}

		//krb5.conf file will be available as local resource in JM/TM container
		if (hasKrb5) {
			javaOpts += " -Djava.security.krb5.conf=krb5.conf";
		}

		// Set up the container launch context for the application master
		ContainerLaunchContext amContainer = Records.newRecord(ContainerLaunchContext.class);

		final  Map<String, String> startCommandValues = new HashMap<>();
		startCommandValues.put("java", "$JAVA_HOME/bin/java");

		String jvmHeapMem = JobManagerProcessUtils.generateJvmParametersStr(processSpec, flinkConfiguration);
		startCommandValues.put("jvmmem", jvmHeapMem);

		startCommandValues.put("jvmopts", javaOpts);
		startCommandValues.put("logging", YarnLogConfigUtil.getLoggingYarnCommand(flinkConfiguration));

		startCommandValues.put("class", yarnClusterEntrypoint);
		startCommandValues.put("redirects",
			"1> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.out " +
			"2> " + ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/jobmanager.err");
		String dynamicParameterListStr = JobManagerProcessUtils.generateDynamicConfigsStr(processSpec);
		startCommandValues.put("args", dynamicParameterListStr);

		final String commandTemplate = flinkConfiguration
				.getString(ConfigConstants.YARN_CONTAINER_START_COMMAND_TEMPLATE,
						ConfigConstants.DEFAULT_YARN_CONTAINER_START_COMMAND_TEMPLATE);
		final String amCommand =
			BootstrapTools.getStartCommand(commandTemplate, startCommandValues);

		amContainer.setCommands(Collections.singletonList(amCommand));

		LOG.debug("Application Master start command: " + amCommand);

		return amContainer;
	}

再看⑧

5.YarnClientImpl#submitApplication

  public ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    ApplicationId applicationId = appContext.getApplicationId();
    if (applicationId == null) {
      throw new ApplicationIdNotProvidedException(
          "ApplicationId is not provided in ApplicationSubmissionContext");
    }
    SubmitApplicationRequest request =
        Records.newRecord(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(appContext);

    //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
    rmClient.submitApplication(request);

    int pollCount = 0;
    long startTime = System.currentTimeMillis();

    while (true) {
      try {
        YarnApplicationState state =
            getApplicationReport(applicationId).getYarnApplicationState();
        if (!state.equals(YarnApplicationState.NEW) &&
            !state.equals(YarnApplicationState.NEW_SAVING)) {
          LOG.info("Submitted application " + applicationId);
          break;
        }

        long elapsedMillis = System.currentTimeMillis() - startTime;
        if (enforceAsyncAPITimeout() &&
            elapsedMillis >= asyncApiPollTimeoutMillis) {
          throw new YarnException("Timed out while waiting for application " +
              applicationId + " to be submitted successfully");
        }

        // Notify the client through the log every 10 poll, in case the client
        // is blocked here too long.
        if (++pollCount % 10 == 0) {
          LOG.info("Application submission is not finished, " +
              "submitted application " + applicationId +
              " is still in " + state);
        }
        try {
          Thread.sleep(submitPollIntervalMillis);
        } catch (InterruptedException ie) {
          LOG.error("Interrupted while waiting for application "
              + applicationId
              + " to be successfully submitted.");
        }
      } catch (ApplicationNotFoundException ex) {
        // FailOver or RM restart happens before RMStateStore saves
        // ApplicationState
        LOG.info("Re-submit application " + applicationId + "with the " +
            "same ApplicationSubmissionContext");
        rmClient.submitApplication(request);
      }
    }

    return applicationId;
  }

6.YarnJobClusterEntrypoint#main

	public static void main(String[] args) {
		// startup checks and logging
		EnvironmentInformation.logEnvironmentInfo(LOG, YarnJobClusterEntrypoint.class.getSimpleName(), args);
		SignalHandler.register(LOG);
		JvmShutdownSafeguard.installAsShutdownHook(LOG);

		Map<String, String> env = System.getenv();

		final String workingDirectory = env.get(ApplicationConstants.Environment.PWD.key());
		Preconditions.checkArgument(
			workingDirectory != null,
			"Working directory variable (%s) not set",
			ApplicationConstants.Environment.PWD.key());

		try {
			YarnEntrypointUtils.logYarnEnvironmentInformation(env, LOG);
		} catch (IOException e) {
			LOG.warn("Could not log YARN environment information.", e);
		}

		final Configuration dynamicParameters = ClusterEntrypointUtils.parseParametersOrExit(
			args,
			new DynamicParametersConfigurationParserFactory(),
			YarnJobClusterEntrypoint.class);
		final Configuration configuration = YarnEntrypointUtils.loadConfiguration(workingDirectory, dynamicParameters, env);

		YarnJobClusterEntrypoint yarnJobClusterEntrypoint = new YarnJobClusterEntrypoint(configuration);

		ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint);
	}

从这里开始就启动了YarnJobClusterEntrypoint,更深一步的细节涉及到flink内部的rm、dispatcher和jm的启动就留到后面几期吧!

总览

 这一期涉及到的源码流程图如下:

我们下期见!

举报

相关推荐

0 条评论