1、 上期回顾 0:00:00 ~0:20:00
2、 TaskManager 的启动 0:20:00 ~ 1:39:00
flink-daemon.sh 脚本 38行
CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner
TaskManager 的启动主类 TaskManagerRunner
两种启动方式:
-----------------------代码开始 - 0:20:00 ~ 1:39:00 ---------------------
TaskManagerRunner类 main 方法
——》TaskManagerRunner类runTaskManagerSecurely
——》TaskManagerRunner类runTaskManager
——》TaskManagerRunner类构造函数
▼
//注释: 线程池
this.executor = java.util.concurrent.Executors
//注释: HA 服务: ZooKeeperHaServices
//初始化 RpcService
//注释: 初始化 HeartbeatServices
//注释: 初始化 BlobCacheService
//注释: 提供外部资源的信息
/ * 注释: 启动 TaskManager
* 1、负责创建 TaskExecutor,负责多个任务Task的运行
/
taskManager = startTaskManager(this.configuration,
——》TaskManagerRunner类startTaskManager
▼
/ 第1 步大事: 注释: taskManagerServices = TaskManagerServices
*/
TaskManagerServices taskManagerServices = TaskManagerServices
.fromConfiguration(taskManagerServicesConfiguration, blobCacheService.getPermanentBlobService(), taskManagerMetricGroup.f1, ioExecutor,
fatalErrorHandler);
/ * 第2步大事: 注释: 创建 TaskExecutor 实例
* 内部会创建两个重要的心跳管理器:
* 1、JobManagerHeartbeatManager
* 2、ResourceManagerHeartbeatManager
*/
return new TaskExecutor(rpcService, taskManagerConfiguration,
▲
-----------------------代码结束 1:39:00 ---------------------
3、 TaskManager /TaskExecutor的注册和 心跳 1:50 :00 ~3:18:00
核心入口:
-----------------------代码开始 -接上节 return new TaskExecutor 代码 ---------------------
TaskExecutor 构造函数
——》TaskExecutor Onstart 方法
——主方法 》TaskExecutor #startTaskExecutorServices();
▼
/* 注释: 启动 ResourceManagerLeaderListener
* TaskManger 向 ResourceManager 注册是 ResourceManagerLeaderListener 来完成的,
* 它会监控 ResourceManager 的 leader 变化, 如果有新的 leader 被选举出来,
* 将会调用 notifyLeaderAddress() 方法去触发与 ResourceManager 的重连
*/
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
new ResourceManagerLeaderListener()
——》TaskExecutor #ResourceManagerLeaderListener#notifyLeaderAddress
——》TaskExecutor #reconnectToResourceManager
▼
* 注释: 链接新的 ResourceManager
*/
tryConnectToResourceManager();
——》 TaskExecutor#connectToResourceManager
▼
* 注释: 构建 TaskExecutorRegistration 对象
*/
final TaskExecutorRegistration taskExecutorRegistration = new
* 注释: TaskExecutor 和 ResourceManager 之间的链接对象
*/
resourceManagerConnection = new TaskExecutorToResourceManagerConnection(
/ * TODO_MA 注释: 启动
resourceManagerConnection.start();
——》RegisteredRpcConnection类.start();
▼
* 注释: 创建注册对象
*/
final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
——》RegisteredRpcConnection类 createNewRegistration
▼
* 注释: 生成注册对象 ResourceManagerRegistration
RetryingRegistration<F, G, S> newRegistration = checkNotNull(generateRegistration());
↓
TaskExecutorToResourceManagerConnection#generateRegistration()
▲
——回来 》RegisteredRpcConnection类 createNewRegistration
* 注释: 开始注册
newRegistration.startRegistration();
——》 RetryingRegistration#startRegistration();
——》 RetryingRegistration#register
* 注释: 调用注册
*/
CompletableFuture<RegistrationResponse> registrationFuture = invokeRegistration
↓
TaskExecutorToResourceManagerConnection# ResourceManagerRegistration#invokeRegistration
* 注释: 向 ResourceManager 注册 TaskExecutor
*/
return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout);
↓
ResourceManager#registerTaskExecutor
* 注释: 注册
*/
return registerTaskExecutorInternal(taskExecutorGateway,
——》ResourceManager#registerTaskExecutorInternal
▼
* 注释: 真正完成注册
* key = ResourceID
* value = WorkerRegistration
*/
taskExecutors.put(taskExecutorResourceId, registration);
* 注释: 开始注册
*/
newRegistration.startRegistration();