0
点赞
收藏
分享

微信扫一扫

快速搞定线程池源码


关注“Java后端技术全栈

回复“面试”获取最新资料

回复“加群”邀您进技术交流群

何为线程池?

顾名思义,线程池就是存放一定量线程的容器,当有待执行任务的时候直接从线程池中取出线程执行任务,任务执行完成之后将线程回放至线程池中。

线程池的优点:降低了线程频繁创建、销毁的开销,提高系统的响应速度,方便统一管理创建的线程。

java.util.concurrent.ThreadPoolExecutor

线程池(ThreadPoolExecutor)提供 4 个默认的构造方法,固定参数 5 个。如下图:

快速搞定线程池源码_devops

核心参数解释如下:

  • 核心线程数量:corePoolSize
  • 最大线程数量:maximumPoolSize
  • 非核心线程空闲等待时间:keepAliveTime
  • 等待时间单位:timeUnit
  • 等待阻塞队列:blockingQueue<Runnable>
  • (可选)线程工厂创建线程:threadFactory
  • (可选)线程池拒绝策略:rejectedExecutionHandler

线程池原理

快速搞定线程池源码_devops_02

corePoolSize

线程池中默认存活的线程数量。不同的线程池对于核心线程数量有不同的要求,也与 allowCoreThreadTimeout 参数有关。

当 allowCoreThreadTimeout= true 时,核心线程没有任务且存活时间超过空闲等待时间后终止。

maximumPoolSize

线程池中允许的最大线程数量。

当 currentThreadNumber >= corePoolSize,且任务队列已满时,线程池会创建新线程来处理任务;当 currentThreadNumber =maxPoolSize,且任务队列已满时,线程池会拒绝处理任务而抛出异常。

keepAliveTime

线程池中空闲线程允许存活的时间,超过配置时间将会被终止。

  • blockingQueue:线程池缓存队列存放待处理的线程任务。
  • ArrayBlockingQueue:指定大小的等待队列(FIFO);有界队列,创建时需要指定队列的大小。
  • LinkedBlockingQueue:基于链表的等待队列(FIFO);无界队列,创建时不指定队列大小,默认为 Integer.MAX_VALUE。
  • PriorityBlockingQueue:带有优先级的等待队列
  • SynchronizedQueue:不存放对象的等待队列;同步移交队列,直接新建一个线程来执行新来的任务。

threadFactory

线程工厂,创建线程使用。

rejectedExecutionHandler

当线程池的任务缓存队列已满并且线程池中的线程数目达到 maximumPoolSize 时,如果还有任务到来就会采取任务拒绝策略,通常有以下四种策略:

  • AbortPolicy(默认策略):丢弃任务并抛出 RejectedExecutionException。
  • CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。显然这样做不会真的丢弃任务,但是,任务提交线程的性能极有可能会急剧下降。
  • DiscardOldestPolicy:丢弃队列中最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务(舍弃最老的请求,即将队列头部任务舍弃)。
  • DiscardPolicy:不做任何处理,直接丢弃任务。

怎么创建线程池?

线程池的创建主要有 2 种方式

  • 基于 ThreadPoolExecutor 的构造方法创建
  • Executors 执行器创建

Executors 执行器创建线程池是在 ThreadPoolExecutor 构造方法上进行简单的封装,特殊场景根据需要自行创建。可以把Executors理解成一个工厂类 。

阿里开发规范中是建议使用ThreadPoolExecutor 来创建线程池。

// ThreadPoolExecutor 最基础的构造方法
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    // 核心线程、最大线程数量必须大于 0,且 最大线程数量 大于等于 核心线程数量,空闲等待时间大于 0
    if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ? null : AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

常见线程池有以下 4 种:

  • newFixedThreadPool:固定大小的线程池
  • newSingleThreadExecutor:单个线程线程池
  • newCachedThreadPool:缓存线程池
  • newScheduledThreadPool:调度线程池

newFixedThreadPool

固定大小的线程池线程数量不存在变化,待处理的任务过多时会存放到缓存队列中。线程池会维护一定数量的线程,当创建的线程过多时阻塞队列中等待执行的线程数量会大量堆积,系统资源不足时容易发生 OOM。

// 创建线程池
ExecutorService threadPool = Executors.newFixedThreadPool(5);
public static ExecutorService newFixedThreadPool(int nThreads) {
        //  固定大小是指: 核心线程数量 = 最大线程数量
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

直接使用线程池构造方法创建固定线程池,线程池中有且仅有固定数量的线程去执行待执行的任务。当“待执行的任务数量 > maximumPoolSize + blockingQueue.size()”时,会抛出异常 java.util.concurrent.RejectedExecutionException。

// 阻塞队列大小为 5
LinkedBlockingQueue<Runnable> blockingQueue = new LinkedBlockingQueue<>(5);
// 创建线程数量为 3 的固定大小线程池
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(3, 3, 0, TimeUnit.SECONDS, blockingQueue);
// 待提交执行任务数量为 10,允许执行任务的数量为 8
for (int i = 0; i< 10; i++) {
    poolExecutor.submit(
        () -> {
            System.out.println(Thread.currentThread().getName());
        });
}

newSingleThreadExecutor

单线程线程池有且仅有一个线程,若有多余的任务提交到线程池中则会被暂存到阻塞队列,待线程空闲时再去执行,当线程在执行过程中失败而终止时,会创建个新的线程去执行缓存队列中的任务。

ExecutorService threadPool = Executors.newSingleThreadExecutor();
public static ExecutorService newSingleThreadExecutor() {
        // 核心线程数、最大线程数都为 1,阻塞队列大小为 Integer.MAX_VALUE
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }

newCachedThreadPool

缓存线程池初始时不存在线程,根据需要创建线程,当线程池中不存在空闲可用线程时会创建新的线程,线程池中超过 60s 且未使用的线程将被终止并删除。因此,合理的空闲等待时间,线程池可以维护一定数量的线程有利于提高性能。

ExecutorService threadPool = Executors.newCachedThreadPool();
public static ExecutorService newCachedThreadPool() {
        // 线程池核心线程数为 0,最大为 Integer.MAX_VALUE,空闲等待时间未 60s
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

newScheduledThreadPool

调度线程池,可以以固定的频率执行任务或者固定的延时执行任务。

ExecutorService threadPool = Executors.newScheduledThreadPool(3);
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        // 设置核心线程数量,默认最大线程数量为 Integer.MAX_VALUE
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }


public ScheduledThreadPoolExecutor(int corePoolSize) {
        // ThreadPoolExecutor 父类构造方法
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

scheduleAtFixedRate:固定频率执行任务,即任务执行的频率保持不变。

// command:需要执行的任务;initialDelay:初始执行延迟时间;period:后续任务执行延迟时间;unit:延迟时间单位
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,long initialDelay,long period,TimeUnit unit);

// 创建核心线程为 1 的调度线程池,初始延迟 3s,后续任务执行延迟 1s【周期性的操作,period = 0,所有的任务都将在初始延迟后执行,没有周期性可言】
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
          threadPool.scheduleAtFixedRate(
                  ()-> {
                      try {
                          // 假设任务执行时间为 3s,当 workTime <= period 时,延迟 period 时间后执行;当 workTime > period ,延迟 workTime 时间后执行
                          Thread.sleep(3000);
                          System.out.println("scheduleAtFixedRate " + Thread.currentThread().getName() + " " +                                                                                              DateFormat.getTimeInstance().format(new Date()) );
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  },
                  3,
                  1,
                  TimeUnit.SECONDS);

scheduleWithFixedDelay:固定的延时执行任务,指上一次执行成功之后和下一次开始执行的之前的时间。

public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay,TimeUnit unit);

// 创建核心线程为 1 的调度线程池,初始延迟 3s,后续任务将在上一个任务执行成功 1 是后执行;
ScheduledExecutorService threadPool = Executors.newScheduledThreadPool(1);
          threadPool.scheduleWithFixedDelay(
                  ()-> {
                      try {
                          Thread.sleep(1000);
                          System.out.println("scheduleWithFixedDelay " + Thread.currentThread().getName() + "  " +                                                                                      DateFormat.getTimeInstance().format(new Date()) );
                      } catch (InterruptedException e) {
                          e.printStackTrace();
                      }
                  },
                  3,
                  1,
                  TimeUnit.SECONDS);

线程池有哪些状态

通过获取线程池状态,可以判断线程池是否是运行状态、可否添加新的任务以及优雅地关闭线程池等。

快速搞定线程池源码_多线程_03

  • RUNNING: 线程池的初始化状态,可以添加待执行的任务。
  • SHUTDOWN:线程池处于待关闭状态,不接收新任务仅处理已经接收的任务。
  • STOP:线程池立即关闭,不接收新的任务,放弃缓存队列中的任务并且中断正在处理的任务。
  • TIDYING:线程池自主整理状态,调用 terminated() 方法进行线程池整理。
  • TERMINATED:线程池终止状态。

判断线程池状态常用方法总结如下。

shutdown 和 shutdownNow 的联系和区别:

  • shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,不接收新的任务。
  • shutdownNow():立即终止线程池,尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务。

isShutdown、isTerminated、isTerminating 的区别:

  • isShutDown():调用 shutdown() 或 shutdownNow() 方法后返回 true。
  • isTerminated():线程池中不存在任何需要执行的任务时,返回 true。
  • isTerminating() 线程池调用 shutdown() 或 shutdownNow() 后但是没有完全终止返回 true。

常用方法

线程池任务执行 execute()和 submit() 方法。

  • execute():ThreadPoolExecutor 类实现 Executor 接口中的方法,无返回值。
  • submit():ExecutorService 接口中的方法,有返回值,可以利用这一特性对任务的 Future.get() 抛出异常进行处理。

Executor 框架成员及关系图如下:

快速搞定线程池源码_队列_04

线程池监控

使用线程池可以提高系统并发时的吞吐量,提高系统性能。但是使用不当会造成系统资源占用过高,线程池缓存队列堆积大量待执行任务、缓存线程池中存在大量的耗时任务等会造成内存溢出、系统访问缓慢等问题。因此监控线程池就显得极为重要,下面根据源码解读常用方法进行线程池的监控。

核心线程数量:

// 线程池维持线程的最小存活数量与 allowCoreThreadTimeOut 参数有关
public int getCorePoolSize() {
    return corePoolSize;
}

线程池最大的线程数量:

public int getMaximumPoolSize() {
    return maximumPoolSize;
}

线程池最多创建的线程数量:

public int getLargestPoolSize() {
    // 获取主线程锁,获取调用此方法时,线程池中曾创建的最大的线程数量
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        return largestPoolSize;
    } finally {
        mainLock.unlock();
    }
}

// 添加待执行任务方法
private boolean  addWorker(Runnable firstTask, boolean core){
    ...
        if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) {
            if (t.isAlive()) // precheck that t is startable
                throw new IllegalThreadStateException();
            // 添加待执行的任务
            workers.add(w);
            // 获取线程池中存在的工作线程的大小
            int s = workers.size();
            if (s > largestPoolSize)
                // 赋值
                largestPoolSize = s;
            workerAdded = true;
       }
  ...
}

线程池当前存在的线程数量:

public int getPoolSize() {
    // 获取主线程锁,获取调用此方法时,线程池中存在线程数量
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        // Remove rare and surprising possibility of
        // isTerminated() && getPoolSize() > 0
        return runStateAtLeast(ctl.get(), TIDYING) ? 0 : workers.size();
    } finally {
        mainLock.unlock();
    }
}

//  最近运行状态
private static boolean runStateAtLeast(int c, int s) {
       return c >= s;
}

线程池已完成任务数量:

// Worker 类属性获取线程任务计数器
volatile long completedTasks;

public long getCompletedTaskCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        long n = completedTaskCount;
        // 遍历线程池中的线程
        for (Worker w : workers)
            n += w.completedTasks;
        return n;
    } finally {
        mainLock.unlock();
    }
}

线程池存在的任务总量:

public long getTaskCount() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        long n = completedTaskCount;
        for (Worker w : workers) {
            n += w.completedTasks;
            // w.islocked() 获取当前任务的状态,调用 isHeldExclusively() 判断
            if (w.isLocked())
                ++n;
        }
        return n + workQueue.size();
    } finally {
        mainLock.unlock();
    }
}

//  Work 内部类
public void lock()        { acquire(1); } // 加锁
public boolean tryLock()  { return tryAcquire(1); }// 尝试获取锁
public void unlock()      { release(1); }// 释放锁

public boolean isLocked() { return isHeldExclusively(); }
// The value 0 represents the unlocked state.
// The value 1 represents the locked state.
protected boolean isHeldExclusively() {
    return getState() != 0;
}

好了,以上便是今天对线程池源码相关的分享。

码字不易。老田期待你来个:点赞...转发...

快速搞定线程池源码_devops_05


举报

相关推荐

0 条评论