0
点赞
收藏
分享

微信扫一扫

[EMQX开源物联网 MQTT 消息服务V3.4.6源码解析系列]-3-内核层监督进程的启动过程


3.1 简介

上一个章节我们看了emqx_sup监督进程启动初始化逻辑如下: 根据emqx的分层逻辑 我们今天按启动顺序来看,先来看内核层的逻辑

emqx_kernel_sup监督进程树启动的逻辑

init([]) ->
%% Kernel Sup
KernelSup = supervisor_spec(emqx_kernel_sup),
%% Router Sup
RouterSup = supervisor_spec(emqx_router_sup),
%% Broker Sup
BrokerSup = supervisor_spec(emqx_broker_sup),
%% Session Manager
SMSup = supervisor_spec(emqx_sm_sup),
%% Connection Manager
CMSup = supervisor_spec(emqx_cm_sup),
%% Sys Sup
SysSup = supervisor_spec(emqx_sys_sup),
{ok, {{one_for_all, 0, 1},
[KernelSup,
RouterSup,
BrokerSup,
SMSup,
CMSup,
SysSup]}}.

内核层监督进程emqx_kernel_sup的启动:

init([]) ->
%%one_for_one如果一个子进程停止,则只重启该进程 最大重启次数是10, 时间片是100秒 100秒内可以重启10次
{ok, {{one_for_one, 10, 100},
%% Gproc 是 Erlang 的进程池 监督进程
[child_spec(emqx_pool_sup, supervisor),
%% 插件钩子管理的 工作进程
child_spec(emqx_hooks, worker),
%% 用于统计的工作进程
child_spec(emqx_stats, worker),
%% 用于统计的工作进程
child_spec(emqx_metrics, worker),
%% 命令行管理的工作进程
child_spec(emqx_ctl, worker),
%% EMQ X 使用 Zone 来管理配置组。一个 Zone 定义了一组配置项 (比如最大连接数等),Listener 可以指定使用某个 Zone,以使用该 Zone 下的所有配置。多个 Listener 可以共享同一个 Zone 配置的匹配规则如下,其优先级 Zone > Global > Default:
child_spec(emqx_zone, worker)]}}.

child_spec(M, worker) ->
%% 进程id。用于监督者区分子进程的id,必须保证唯一 ,这里直接使用模块名字
#{id => M,
%% StartFun = {M, F, A} 用于监督者启动子进程的函数。M代表模块,F代表函数,A代表参数
start => {M, start_link, []},
%% 进程重启策略 Restart = permanent | transient | temporary 表示进程遇到错误之后是否重启 permanent 遇到任何错误导致进程终止就重启 ,temporary 进程永远都不重启 transient 只有进程异常终止的时候会被重启
restart => permanent,
%% 进程关闭表示采用什么手段来关闭进程 Shutdown = brutal_kill | int() >= 0 | infinity brutal_kill 立刻强制关闭进程int() >= 0 等待多少毫秒后强制关闭进程 infinity 当子进程也是监督者时使用,意思是给足够时间让子进程重启
shutdown => 5000,
%% 进程类型 Type = worker | supervisor 告诉监督者子进程是哪种类型的进程,工作进程,还是监督进程?
type => worker,
%% 进程模块 Modules = [Module] | dynamic 表示进程运行依赖哪些模块,仅在代码热更新时使用。使用dynamic的情况是当使用了 Erlang/OTP 发布(Release)等功能,使得Erlang/OTP 可以判断在热更新时需要哪些模块
modules => [M]};

child_spec(M, supervisor) ->
#{id => M,
start => {M, start_link, []},
restart => permanent,
shutdown => infinity,
type => supervisor,
modules => [M]}.

3.2 进程池监督进程的启动

刚刚emqx_pool_sup进程池的启动的MFA,对应如下代码

%% @doc Start the default pool supervisor.
start_link() ->
start_link(?POOL, random, {?POOL, start_link, []}).

然后对应了如下重载函数

start_link(Pool, Type, MFA) ->
start_link(Pool, Type, emqx_vm:schedulers(), MFA).

这里调度器使用erlang的erlang:system_info(schedulers)跟我系统中线程数量是一致的 mac用命令可以查到sysctl -n machdep.cpu.thread_count; 我查到是12 调度器对应逻辑CPU数量

又调用了如下重载函数

start_link(Pool, Type, Size, MFA) ->
%% 新的MFA 获取当前MODULE来启动, 参数为:Pool的值是emqx_pool Type的值是random Size是emqx_vm:schedulers() MFA对应元组{?POOL, start_link, []}
supervisor:start_link(?MODULE, [Pool, Type, Size, MFA]).

初始化代码:

%% 参数为:Pool的值是emqx_pool Type的值是random  Size是emqx_vm:schedulers() Size对应逻辑CPU数量(线程数)  MFA对应元组{?POOL, start_link, []}
init([Pool, Type, Size, {M, F, Args}]) ->
%% 创建gproc_pool对象 类型为random 大小为逻辑CPU数量(线程数)
ensure_pool(Pool, Type, [{size, Size}]),

{ok, {{one_for_one, 10, 3600}, [
begin
ensure_pool_worker(Pool, {Pool, I}, I),
{{M, I}, {M, F, [Pool, I | Args]}, transient, 5000, worker, [M]}
end || I <- lists:seq(1, Size)]}}.
%% one_for_one如果一个子进程停止,则只重启该进程 3600秒内可重启10次
{ok, {{one_for_one, 10, 3600}, [
{{M, I}, %%id
{M, F, [Pool, I | Args]}, %% start MFA
%% restart 进程重启策略 Restart = permanent | transient | temporary 表示进程遇到错误之后是否重启 permanent 遇到任何错误导致进程终止就重启 ,temporary 进程永远都不重启 transient 只有进程异常终止的时候会被重启
transient,
%% 进程关闭表示采用什么手段来关闭进程 Shutdown = brutal_kill | int() >= 0 | infinity brutal_kill 立刻强制关闭进程int() >= 0 等待多少毫秒后强制关闭进程 infinity 当子进程也是监督者时使用,意思是给足够时间让子进程重启
5000,
worker,
[M]}
]}}.
#{id       => M,
start => {M, start_link, []},
restart => permanent,
shutdown => 5000,
type => worker,
modules => [M]};



举报

相关推荐

0 条评论