0
点赞
收藏
分享

微信扫一扫

Flink源码2-TaskManager启动和注册

MaxWen 2021-10-09 阅读 41

1、 上期回顾 0:00:00 ~0:20:00

2、 TaskManager 的启动 0:20:00 ~ 1:39:00

flink-daemon.sh 脚本 38行

CLASS_TO_RUN=org.apache.flink.runtime.taskexecutor.TaskManagerRunner

TaskManager 的启动主类 TaskManagerRunner

两种启动方式:

-----------------------代码开始 - 0:20:00 ~ 1:39:00 ---------------------
TaskManagerRunner类 main 方法
——》TaskManagerRunner类runTaskManagerSecurely
——》TaskManagerRunner类runTaskManager
——》TaskManagerRunner类构造函数

//注释: 线程池
this.executor = java.util.concurrent.Executors
//注释: HA 服务: ZooKeeperHaServices

//初始化 RpcService
//注释: 初始化 HeartbeatServices
//注释: 初始化 BlobCacheService
//注释: 提供外部资源的信息
/ * 注释: 启动 TaskManager
* 1、负责创建 TaskExecutor,负责多个任务Task的运行
/
taskManager = startTaskManager(this.configuration,
——》TaskManagerRunner类startTaskManager

/
第1 步大事: 注释: taskManagerServices = TaskManagerServices
*/
TaskManagerServices taskManagerServices = TaskManagerServices
.fromConfiguration(taskManagerServicesConfiguration, blobCacheService.getPermanentBlobService(), taskManagerMetricGroup.f1, ioExecutor,
fatalErrorHandler);

/ * 第2步大事: 注释: 创建 TaskExecutor 实例
* 内部会创建两个重要的心跳管理器:
* 1、JobManagerHeartbeatManager
* 2、ResourceManagerHeartbeatManager
*/
return new TaskExecutor(rpcService, taskManagerConfiguration,

-----------------------代码结束 1:39:00 ---------------------

3、 TaskManager /TaskExecutor的注册和 心跳 1:50 :00 ~3:18:00

核心入口:

-----------------------代码开始 -接上节 return new TaskExecutor 代码 ---------------------
TaskExecutor 构造函数


——》TaskExecutor Onstart 方法
——主方法 》TaskExecutor #startTaskExecutorServices();

/* 注释: 启动 ResourceManagerLeaderListener
* TaskManger 向 ResourceManager 注册是 ResourceManagerLeaderListener 来完成的,
* 它会监控 ResourceManager 的 leader 变化, 如果有新的 leader 被选举出来,
* 将会调用 notifyLeaderAddress() 方法去触发与 ResourceManager 的重连
*/
resourceManagerLeaderRetriever.start(new ResourceManagerLeaderListener());
new ResourceManagerLeaderListener()
——》TaskExecutor #ResourceManagerLeaderListener#notifyLeaderAddress
——》TaskExecutor #reconnectToResourceManager

* 注释: 链接新的 ResourceManager
*/
tryConnectToResourceManager();
——》 TaskExecutor#connectToResourceManager

* 注释: 构建 TaskExecutorRegistration 对象
*/
final TaskExecutorRegistration taskExecutorRegistration = new

             *  注释: TaskExecutor 和 ResourceManager 之间的链接对象
     */
    resourceManagerConnection = new TaskExecutorToResourceManagerConnection(

                             / *  TODO_MA 注释: 启动
    resourceManagerConnection.start();
         ——》RegisteredRpcConnection类.start();
                                                    ▼
                     *  注释: 创建注册对象
             */
    final RetryingRegistration<F, G, S> newRegistration = createNewRegistration();
                  ——》RegisteredRpcConnection类  createNewRegistration
                                                                 ▼
                                 *  注释: 生成注册对象 ResourceManagerRegistration 
    RetryingRegistration<F, G, S> newRegistration = checkNotNull(generateRegistration());
                                                                     ↓
                                    TaskExecutorToResourceManagerConnection#generateRegistration()
                                                                   ▲
                     
                    ——回来 》RegisteredRpcConnection类  createNewRegistration
                                        *  注释: 开始注册 
                    newRegistration.startRegistration();

                          ——》 RetryingRegistration#startRegistration();
                                      ——》 RetryingRegistration#register
                    *  注释: 调用注册
         */
        CompletableFuture<RegistrationResponse> registrationFuture = invokeRegistration
                                                                                        ↓

TaskExecutorToResourceManagerConnection# ResourceManagerRegistration#invokeRegistration
* 注释: 向 ResourceManager 注册 TaskExecutor
*/
return resourceManager.registerTaskExecutor(taskExecutorRegistration, timeout);

ResourceManager#registerTaskExecutor
* 注释: 注册
*/
return registerTaskExecutorInternal(taskExecutorGateway,
——》ResourceManager#registerTaskExecutorInternal

* 注释: 真正完成注册
* key = ResourceID
* value = WorkerRegistration
*/
taskExecutors.put(taskExecutorResourceId, registration);
* 注释: 开始注册
*/
newRegistration.startRegistration();

举报

相关推荐

0 条评论