0
点赞
收藏
分享

微信扫一扫

netty源码解读三(NioEventLoop)

西曲风 2022-04-04 阅读 87
javanetty

NioEventLoop

初始化EventExecutor类型的数组

数组大小默认为cpu数量的两倍,遍历数组,通过 new NioEventLoop(xxx)往数组中添加元素,NioEventLoop继承了EventExecutor;每次需要线程时,执行chooser的next方法从数组中取出一个线程;
关键代码
打开netty源码,找到example包下的EchoService类,追溯创建boss线程组和worker线程组的代码,最终会找到MultithreadEventExecutorGroup的构造方法,其中有两行关键代码用于初始化boss或者worker线程组:
children = new EventExecutor[nThreads];//初始化数组
children[i] = newChild(executor, args);//遍历children数组,初始化每个元素
newChild中用到了策略模式,该方法根据不同的操作系统,有不同的实现;找到NioEventLoopGroup#newChild方法,该方法中会执行new NioEventLoop(…)并返回该实例(所以EventExecutor数组中的每个元素是NioEventLoop类型),其中会将 ThreadPerTaskExecutor实例作为executor,创建selector并且将selector作为属性保存;后续会将任务提交给executor.execute(…)执行;

NioEventLoop如何能生产线程

NioEventLoop封装了ThreadPerTaskExecutor实例,而ThreadPerTaskExecutor封装了DefaultThreadFactory实例与execute方法,而DefaultThreadFactory中的newThread方法会返回new FastThreadLocalThread,FastThreadLocalThread继承了Thread。所以NioEventLoop中可以生产线程,具有接收任务,执行任务的能力;且是在execute方法中调用newThread方法创建新线程并启动的,即任务来了才会创建线程去执行任务。所以NioEventLoop拿到任务后,执行ThreadPerTaskExecutor#execute方法,新建线程执行任务。这一块用到了命令模式;
ThreadPerTaskExecutor类和ThreadPoolExecutor类都属于juc中Executor接口的实现类,后者我们已经非常熟悉了,其实现了Executor接口的execute方法,该方法已经分析过了,非常熟悉;而前者也实现了execute方法(无非是新建线程,启动线程,但这里的线程是FastThreadLocalThread实例,该线程类时netty中特有的,主要是为了配合netty中的fastThreadLocalMap而使用);
其实理解了线程池ThreadPoolExecutor类为啥能生产线程执行任务,就能够理解ThreadPerTaskExecutor类为啥能生产线程执行任务了;原理一模一样;

chooser的next方法

判断一个数是否是2的次方
这个数和这个数的补码取与,若为本身,则是。补码是原码取反加1;
若线程池大小为2的次方,则选择策略是每次a++ 对 线程池大小减1 取与,此时线程池大小减1是几个1,取与后的结果是小于线程池大小减1的值;
若线程池大小不为2的次方,则直接a++对线程池大小取模,取模后的结果是小于线程池大小的值;两种算法的结果都一样(在chm源码中根据key的hash路由规则就是这样的),但上边算法更优秀。这里的线程池大小就是数组的长度;
netty线程池中开启一个新线程必须要带一个任务,否则会报NullPointerException

在这里插入图片描述

NioEventLoopGroup可以看成管理NioEventLoop的线程池组,而NioEventLoop可以看成单线程线程池;

向NioEventLoop提交任务

在initAndRegister中的register方法中会向NioEventLoop中提交一个register0任务,实现代码为eventLoop.execute(r1);其中会调用到SingleThreadEventExecutor#execute方法
1)addTask(r1),将任务r1加入到任务队列中;
2)判断线程已经创建并启动
3)executor.execute(r2),此处的executor为NioEventLoop中封装的ThreadPerTaskExecutor,此时会新建一个线程,执行任务r2;
4)r2中会调用SingleThreadEventExecutor.run()方法,在该方法中会执行任务队列中的任务以及死循环监听;

SingleThreadEventExecutor.run()

自旋方法,负责三块:io事件,普通任务队列中的任务,调度任务队列中任务;SingleThreadEventExecutor.run方法最终会调用NioEventLoop#run方法,这是个for循环的方法,主要是选择不同策略;

select策略选择总结

1)策略选择,根据当前EventLoop中的任务队列是否有任务进行多路复用的select方法选择,如果有任务,则当前线程执行多路复用的select不能阻塞,从而调用selectNow()方法,返回多路复用器上就绪的channel个数,意思是没有响应事件则select方法立即返回,当前线程就要去执行任务队列中的任务了,不能一直阻塞;若任务队列中没有任务,则不调用select方法,直接返回-1,下面会再判断是否有周期任务再决定是否要一直阻塞还是调用超时阻塞;
2)先处理strategy为-1的情况,返回-1,则会再判断周期任务是否还有,如果没有周期任务,则直接调用永久阻塞的select方法直到有事件发生并且返回就绪事件个数;如果有周期任务,则判断还有多久时间t会执行周期任务,如果t不大于0,则直接调用selectNow方法,若t大于0,则调用带超时的select(t)方法,并且返回就绪事件个数;
还有一种特殊情况(算了,暂时不考虑这个特殊情况),刚开始任务队列无任务,strategy为-1,在准备执行周期任务相关判断之前,会再次判断任务队列是否有任务,若此时又有了,则带着strategy为-1继续往下执行,可以看到下边代码并没有处理strategy为-1的情况,即会再次进入for循环。
3)再处理strategy不为-1的情况,此时strategy值代表就绪的channel事件个数,接下来根据strategy值进行不同的判断;分为以下几种情况;

根据策略,执行不同操作

1)ioRatio为100表示io优先
io处理完之后,再处理本地任务;
1.1)strategy大于0
执行processSelectedKeys方法,即处理io事件;
1.2)在finally代码块中执行不带入参的runAllTasks方法,即执行本地任务队列中的任务;
2)当ioRatio不为100且strategy>0,则要根据io耗时来计算处理任务队列中的任务需要的时间, strategy大于0表示当前nioEventLoop的selector上有就绪事件则先执行processSelectedKeys方法,并且记录耗时,再根据此耗时计算处理任务队列中任务的所需时间t1,计算方法很简单,即以(100-ioRatio)/ioRatio作为倍数,再将t1作为入参执行runAllTasks方法,表示执行本地任务队列任务能占用的最大时间;在该方法中,还是会先从调度队列中转移需要被调度的任务至普通任务队列,接着从普通任务队列取任务执行,根据入参时间,从普通任务队列中每执行完64个任务,检查一下耗时是否超过入参规定的时间t1,若超过了,则返回false,退出带超时时间的该方法;
有入参的runAllTasks方法和没有入参的runAllTasks方法的逻辑是不同的,后者是当普通任务队列中的任务执行完了且调度任务队列中的该调度的任务也执行完了,则退出;前者不关心调度任务队列中该调度的任务是否执行完了,比如普通任务队列长度为10,调度任务队列中有20个任务延迟时间到了该调度了,则转移10个至普通任务队列后,就不再转移了,只把这转移的10个任务处理完了,就结束了(因为前者有超时限制);但后者分两次,也要把这20个该调度的任务转移到普通任务队列执行;
3)ioRatio不为100且strategy小于或等于0,此时selector上没有就绪的事件,则调用runAllTasks(0)方法,只处理本地任务,入参为0表示执行最少数量的本地任务,即64个;linux的epoll导致的bug也会使select方法返回的就绪事件个数为0即strategy为0,如何区分这两种情况呢?其实也没做区分,bug导致的结果是空轮询周期极短,以至于普通任务队列中无任务可执行了或者调度队列中任务的延迟时间都没到,没有任务往普通任务队列中转移,此时标志位ranTasks为false,strategy为0,会执行下边步骤4);而selectNow或者select(timeout)函数返回0,来到此方法时,也可能面临普通任务队列中无任务可执行,从而进入到下边步骤4);
4)若ranTasks为false且strategy为0,则可以断定出现了epoll bug
ranTasks表示本次循环是否执行过任务队列中的任务,若没有执行过且selector上没有就绪的事件,此时会执行unexpectedSelectorWakeup方法,其中会先看是否达到重建selector的条件,即若连续出现没有执行过任务队列中的任务且selector上没有就绪的事件的次数达到阈值512次,则可判定出现了空轮训bug,该问题由于selector连续过早的返回,会导致的cpu使用率飙升,从而使机器性能下降;
重建selector是在rebuildSelector0方法中实现的,创建一个新的selector,从旧的selector中拿到事件集合set,遍历该集合,若key为无效状态,或已经注册到新的selector上,则跳过该key,判断下一个,注册时,从key中拿到channel,注册进新的selector上;通过key.isValid()方法可以判断是否有效,官方注释说:在创建时有效,直到被取消、其通道关闭或其选择器关闭为止。而epoll导致的bug恰好是由于channel关闭,所以此时key为无效状态;若重建成功,则最后会将累加次数清零,否则不清零,继续下一轮判断;

processSelectedKeys()方法

遍历就绪事件集合selectedKeys,拿到每个事件的channel,执行processSelectedKey方法;其中判断若是read或accept事件,则调用unsafe.read方法;

执行io所花时间与任务队列时间的分配

ioRatio默认为50,即执行io和任务队列各占50%时间,若ioRatio为100,则表示

runAllTasks()方法

1)在while循环中,将延迟时间到了的调度任务从调度队列转移到普通任务队列中,如果在转移过程中,普通任务队列满了,则将调度任务又放回调度队列中;判断延期时间到的条件是调度任务的截止时间减去当前时间不大于0;有点这个意思如调度任务队列长度为100,普通任务队列长度为10,每次从调度任务队列中取出10个任务加到普通任务队列中执行,当最后调度任务中延迟时间到了的任务转移完了,并且普通任务队列中的任务也执行完了,则结束runAllTasks方法;
2)接着从普通任务队列中取出任务,依次执行

runAllTasks(xxx)带超时的方法

从调度队列中转移需要被调度的任务到普通任务队列,所谓需要被调度的任务是指任务的延迟时间到了,该执行了,判断条件是任务的延迟截止时间减去当前时间不大于0;
1)每执行64个任务,就去判断是否超时,若超时了则返回,若没超时则继续执行;
2)当超时时间为0时,代表最多执行64个任务就返回。

空轮询bug的解决

Linux层面epoll管理的socket,当某个socketChannel出现非正常关闭,会导致一个bug,即在该epoll的事件就绪列表中会增加一个air事件,这个事件jdk没有处理好,会直接唤醒select阻塞,返回0,此时strategy为0,任务队列可能会执行,但最终会一直死循环疯狂调用select方法,但又不阻塞,处理不了任何事情,导致cpu使用率过高,其他线程也无法工作,系统会出问题。
Netty通过selectCnt计数,当select方法被唤醒后,若既没有执行io事件也没有执行任务队列任务,则该值会累加,当连续累加到512次后,netty断定当前selector处于bug状态了,则会处理这个bug,即新建一个selector,遍历原来selector的SelectionKey集合,去除无效的key,再把原来selector上的channel重新注册到新的selector上,这个过程就可以把有问题的channel筛掉。最后for循环中就会在新建的selector上做select操作了。

举报

相关推荐

0 条评论