0
点赞
收藏
分享

微信扫一扫

线程池源码简析


 




本文不讲解线程池执行过程过状态的判断,如要阅读可留言索要源码分析代码



基本流程:



用一个AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0))的原子类来保存线程数量和线程池状态,共32位, 高3位保存运行状态, 低29位保存线程数量。其中根据运行状态和线程数量来判断是否需要添加工作线程池。然后调用addWork()创建工作线程Work,每个Work包含thread、firstTask、completedTasks。工作线程创建会调用runWoker()方法来从队列取任务,达到线程复用的效果。



 



线程池怎么达到线程复用?



在调用runWoker时,会判断task是否为空,如果不为空则直接处理,为空则getTask()从队列中获取



/**
  
 
  

   * 执行线程任务的真正处理逻辑
  
 
  

   * 1 如果 task 不为空,则开始执行 task
  
 
  

   * 2 如果 task 为空,则通过 getTask()再去取任务,并赋值给 task,如果取到的 Runnable 不为空,则执行该任务
  
 
  

   * 3 执行完毕后,通过 while 循环继续 getTask()取任务
  
 
  

   * 4 如果 getTask()取到的任务依然是空,那么整个 runWorker()方法执行完毕
  
 
  

   */
  
 
  

   final void runWorker(Worker w) {
  
 
  

       Thread wt = Thread.currentThread();
  
 
  

       Runnable task = w.firstTask;
  
 
  

       // unlock,表示当前 worker 线程允许中断,因为 new Worker 默认的 state=-1,此处是调用
  
 
  

       // Worker 类的 tryRelease()方法,将 state 置为 0,
  
 
  

       //      而 interruptIfStarted()中只有 state>=0 才允许调用中断
  
 
  

       w.firstTask = null;
  
 
  

       w.unlock(); // allow interrupts
  
 
  

       boolean completedAbruptly = true;
  
 
  

       try {
  
 
  

           //注意这个 while 循环,在这里实现了 [线程复用] , 如果 task 为空,则通过getTask 来获取任务
  
 
  
while (task != null || (task = getTask()) != null) {}   // 这里达到了线程复用的效果
 
  

       ....省略.....
  
 
  

   }



 



Worker 继承了 AQS,使用 AQS 来实现独占锁的功能。为什么不使用 ReentrantLock 来实现呢?



worker实现的tryAcquire 方法,它是不允许重入的,而 ReentrantLock 是允许重入的。这样容易中断正在执行的任务,就是 为了避免锁可重入导致正在运行中的任务中断 。



 



线程池提供了多个自定义模板方法,方便监听线程状态或者实现钩子:



protected void beforeExecute(Thread t, Runnable r) { }// 执行前
 
  
protected void afterExecute(Runnable r, Throwable t) { }// 执行后
 
  
void onShutdown() {}//关闭
 
  
protected void terminated() { }//终止



举例子:如果公司需要自定义并行处理框架,如https://gitee.com/jd-platform-opensource/asyncTool,肯定是需要监听线程池状态,然后进行上报,这时候就需要实现beforeExecute、afterExecute、onShutdown来进行监听了



 



 



有一种特殊情况。就是线程池coreSize设置为0时(队列和最大线程数不为0),这时候为什么没有直接拒绝策略?而且采用单线程coreSize=1执行的方式?



--  ???为什么我会发现这个问题?因为我看到xxl-job客户端源码callbackThreadPool 的coreSize=0,所以就很奇怪



因为Woker实现了Runnable接口,并重写了run()方法,如果判断到task即传过来runnable为空,就会从队列里面去一个task来用。这就导致了如果coreSize=0,就会是coreSize=1的效果来表现。



 



```  以下是代码,路径为:execute()->addWorker->run->runWorker->(task = getTask()) != null
  
 
  
// 调用线程池调用的execute()方法
 
  

       .... 省略....
  
 
  

       else if (workerCountOf(recheck) == 0)
  
 
  
addWorker(null, false);
 
  

       .... 省略....
  
 
  

   }
  
 
  
addWorker(Runnable firstTask, boolean core) {
  
 
  

       .... 省略....
  
 
  

       if (workerAdded) {
  
 
  
 
   t.start();// 调用了实现了Runnable接口的worker
 
  

           workerStarted = true;
  
 
  

       }
  
 
  

       .... 省略....
  
 
  

   }
  
 
  
// Worker里面的run()方法
 
  

       runWorker(this);
  
 
  

   }
  
 
  

   final void runWorker(Worker w) {
  
 
  

       .... 省略....
  
 
  
(task = getTask()) != null) {......}
  
 
  

       .... 省略....
  
 
  

   }
  
 
  

   ```


 



以下是一些附件:



- 生命周期流程:




线程池源码简析_线程池


 


- 线程池corePoolSize=0的代码例子:



线程池源码简析_java_02


 


 


- 参考链接: https://www.jianshu.com/p/a977ab6704d7


 

举报

相关推荐

0 条评论