0
点赞
收藏
分享

微信扫一扫

并发编程 - 线程池的使用总结


文章目录

  • ​​为什么要使用线程池​​
  • ​​ThreadPoolExecutor 的策略​​
  • ​​ThreadPoolExecutor​​
  • ​​最佳线程数​​
  • ​​经验值​​
  • ​​最佳线程数目算法​​
  • ​​常用 BlockingQueue​​
  • ​​内置拒绝策略​​
  • ​​两种情况会拒绝处理任务​​
  • ​​Executors 工厂类​​
  • ​​可缓存线程池 CachedThreadPool​​
  • ​​定长线程池 FixedThreadPool​​
  • ​​SingleThreadPool​​
  • ​​ScheduledThreadPool​​
  • ​​线程池使用场景​​
  • ​​使用 Executors 线程池的弊端​​
  • ​​awaitTermination方法说明​​
  • ​​类关系梳理​​

为什么要使用线程池

  • 创建/销毁线程伴随着系统开销,过于频繁的创建/销毁线程,会很大程度上影响处-理效率
  • 线程并发数量过多,抢占系统资源从而导致阻塞
  • 对线程进行一些简单的管理

 

ThreadPoolExecutor 的策略

并发编程 - 线程池的使用总结_等待时间

当一个任务被添加进线程池时,执行策略

  1. 线程数量未达到corePoolSize,则新建一个线程(核心线程)执行任务
  2. 线程数量达到了corePools,则将任务移入队列等待
  3. 队列已满,新建线程(非核心线程)执行任务
  4. 队列已满,总线程数又达到了maximumPoolSize,就会由(RejectedExecutionHandler)抛出异常
  5. 当线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止。这样,线程池可以动态的调整池中的线程数

一句话概况: 提交的任务进来后,会 先检查核心线程池是否已满,如果 满了就添加到阻塞队列中,如果队列也满了,那么就会开始 创建非核心线程 执行任务,如果非核心线程的 数量达到maxPoolSize,就开始执行 拒绝策略。

 

ThreadPoolExecutor

/**
* @param corePoolSize 线程池中核心线程数量, <em>IO密集型的核心线程为系统线程数的2N+1倍,cpu密集型(计算密集型)的为N+1倍</em>
* 线程池在完成初始化后,默认情况下,线程池中并没有任何线程,线程池会等待有任务到来时再创建新线程去执行任务。
* @param maximumPoolSize 线程池中最大线程数量
* @param keepAliveTime 线程池中非核心线程闲置超时时长
* @param unit keepAliveTime 的时间单位
* @param workQueue 线程等待队列. 被提交但尚未被执行的任务
* @param threadFactory 线程创建工厂. 用于创建线程, 一般用默认的即可
* @param handler 拒绝策略. 当任务太多来不及处理, 如何拒绝任务
* */
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
}

  • 刚开始创建了一个线程池,此时是没有任何线程的。但是如果调用了 prestartAllCoreThreads 或者 prestartCoreThread 方法,就会在没有任务到来时创建线程,前者是创建 corePoolSize 个线程,后者是只创建一个线程。​​Java 线程池​​

 

最佳线程数

经验值

配置线程数量之前,首先要看任务的类型是 IO密集型,还是CPU密集型?

  • 什么是IO密集型?比如:频繁读取磁盘上的数据,或者需要通过网络远程调用接口。
  • 什么是CPU密集型?比如:非常复杂的调用,循环次数很多,或者递归调用层次很深等。

IO密集型配置线程数经验值是:2N,其中N代表CPU核数。

  • I/O 密集型任务,就说明需要较多的等待,这个时候可以参考 Brain Goetz 的推荐方法 线程数 = CPU核数 × (1 + 平均等待时间/平均工作时间)。参考值可以是 N(CPU) 核数 * 2。

CPU密集型配置线程数经验值是:N + 1,其中N代表CPU核数。

  • CPU 密集型任务,那么就意味着 CPU 是稀缺资源,这个时候我们通常不能通过增加线程数来提高计算能力,因为线程数量太多,会导致频繁的上下文切换,一般这种情况下,建议合理的线程数值是 N(CPU)数 + 1。

当然,这只是一个参考值,具体的设置还需要根据实际情况进行调整,比如可以先将线程池大小设置为参考值,再观察任务运行情况和系统负载、资源利用率来进行适当调整。

如何获取N的值?

int availableProcessors = Runtime.getRuntime().availableProcessors();

​那么问题来了,混合型(既包含IO密集型,又包含CPU密集型)的如何配置线程数?​

混合型如果IO密集型,和CPU密集型的执行时间相差不太大,可以拆分开,以便于更好配置。如果执行时间相差太大,优化的意义不大,比如IO密集型耗时60s,CPU密集型耗时1s。

最佳线程数目算法

除了上面介绍是经验值之外,其实还提供了计算公式:

​最佳线程数目 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目​

很显然线程等待时间所占比例越高,需要越多线程。线程CPU时间所占比例越高,需要越少线程。

虽说最佳线程数目算法更准确,但是线程等待时间和线程CPU时间不好测量,实际情况使用得比较少,一般用经验值就差不多了。再配合系统压测,基本可以确定最适合的线程数。

 

  • ​核心线程数量设置技巧​​​: ​​java线程池大小为何会大多被设置成CPU核心数+1?​​
  • ​​面试问,创建多少个线程合适?​​
  • ​​线程池最佳线程数量到底要如何配置?​​

 

常用 BlockingQueue

  • SynchronousQueue(直接交换)​: 总是将任务提交给线程执行, 如果没有空闲的线程,则尝试创建新的线程,如果线程数量已经达到最大值,则执行拒绝策略。 因此使用SynchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行异常策略。
  • LinkedBlockingQueue(无界队列)​: 这个队列接收到任务的时候,如果当前线程数小于核心线程数,则新建线程(核心线程)处理任务;如果当前线程数等于核心线程数,则进入队列等待。由于这个队列没有最大值限制,即所有超过核心线程数的任务都将被添加到队列中,这也就导致了maximumPoolSize的设定失效,因为总线程数永远不会超过corePoolSize
  • ArrayBlockingQueue(有界队列): 可以限定队列的长度,接收到任务的时候,如果没有达到corePoolSize的值,则新建线程(核心线程)执行任务,如果达到了,则入队等候,如果队列已满,则新建线程(非核心线程)执行任务,又如果总线程数到了maximumPoolSize,并且队列也满了,则执行拒绝策略
  • DelayQueue: 队列内元素必须实现Delayed接口,这就意味着你传进去的任务必须先实现Delayed接口。这个队列接收到任务时,首先先入队,只有达到了指定的延时时间,才会执行任务

 

内置拒绝策略

  • AbortPolicy策略: 该策略会直接抛出异常,阻止系统正常工作(​​默认拒绝策略​​)
  • CallerRunsPolicy策略: 只要线程池未关闭, 该策略直接在调用者线程中,运行当前被丢弃的任务
  • DiscardOledestPolicy策略: 该策略将丢弃最老的一个请求, 也就是即将被执行的一个任务, 并尝试再次提交当前任务
  • DiscardPolicy策略: 该策略默默地丢弃无法处理的任务, 不予任务处理

实现​​RejectedExecutionHandler​​接口,可自定义拒绝策略

public class ThreadPoolExecutor extends AbstractExecutorService {
public static class CallerRunsPolicy implements RejectedExecutionHandler {}

public static class AbortPolicy implements RejectedExecutionHandler {}

public static class DiscardPolicy implements RejectedExecutionHandler {}

public static class DiscardOldestPolicy implements RejectedExecutionHandler {}
}

 

两种情况会拒绝处理任务

  • 当线程数已经达到maxPoolSize,切队列已满,会拒绝新任务
  • 当线程池被调用shutdown()后,会等待线程池里的任务执行完毕,再shutdown。如果在调用shutdown()和线程池真正shutdown之间提交任务,会拒绝新任务。线程池会调用rejectedExecutionHandler来处理这个任务。如果没有设置默认是AbortPolicy,会抛出异常

 

Executors 工厂类

可缓存线程池 CachedThreadPool

public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}

public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}

  • 该方法返回一个可根据实际情况调整线程数量的线程池。线程池的线程数量不确定,但若有空闲线程可以复用,则会优先使用可复用的线程。若所有线程均在工作,又有新的任务提交则会创建新的线程处理任务。所有线程在当前任务执行完毕后,将返回线程池进行复用

 

定长线程池 FixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}

  • 该方法返回一个固定线程数量的线程池,该线程池中的线程数量始终不变。当有一个新的任务提交时,线程池中若有空闲线程,则立即执行。若没有,则新的任务会被暂存在一个任务队列中,待有线程空闲时,便处理在任务队列中的任务

 

SingleThreadPool

public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}

  • 有且仅有一个工作线程执行任务
  • 所有任务按照指定顺序执行,即遵循队列的入队出队规则

 

ScheduledThreadPool

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
// ...
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}

public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
// ...
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}

  • 不仅设置了核心线程数,最大线程数也是Integer.MAX_VALUE
  • 这个线程池是上述4个中为唯一个有延迟执行和周期执行任务的线程池

 

线程池使用场景

  • newCachedThreadPool: 用来创建一个可以无限扩大的线程池,适用于服务器负载较轻,执行很多短期异步任务
  • newFixedThreadPool: 创建一个固定大小的线程池,因为采用无界的阻塞队列,所以实际线程数量永远不会变化,适用于可以预测线程数量的业务中,或者服务器负载较重,对当前线程数量进行限制
  • newSingleThreadExecutor: 创建一个单线程的线程池,适用于需要保证顺序执行各个任务,并且在任意时间点,不会有多个线程是活动的场景
  • newScheduledThreadPool: 可以延时启动,定时启动的线程池,适用于需要多个后台线程执行周期任务的场景
  • newWorkStealingPool: 创建一个拥有多个任务队列的线程池,可以减少连接数,创建当前可用cpu数量的线程来并行执行,适用于大耗时的操作,可以并行来执行

 

使用 Executors 线程池的弊端

  • FixedThreadPool 和 SingleThreadPool:
    允许的请求队列长度为 Integer.MAX_VALUE,可能会堆积大量的请求,从而导致 OOM
  • CachedThreadPool 和 ScheduledThreadPool:
    允许的创建线程数量为 Integer.MAX_VALUE,可能会创建大量的线程,从而导致 OOM

线程池不允许使用 Executors 去创建,而是通过 ThreadPoolExecutor 的方式,这样的处理方式让写的同学更加明确线程池的运行规则,规避资源耗尽的风险

 

awaitTermination方法说明

当使用awaitTermination时,主线程会处于一种等待的状态,等待线程池中所有的线程都运行完毕后才继续运行。

boolean b = executorService.awaitTermination(3, TimeUnit.SECONDS);

第一个参数指定的是时间,第二个参数指定的是时间单位(当前是秒)。返回值类型为boolean型。

  • 如果等待的时间超过指定的时间,但是线程池中的线程运行完毕,那么awaitTermination()返回true, 继续执行主线程后续代码。
  • 如果等待的时间超过指定的时间,但是线程池中的线程未运行完毕,那么awaitTermination()返回false, 继续执行主线程后续代码。
  • 如果等待时间没有超过指定时间,等待,直到超过设定的时间后继续执行主线程后续代码!

可以用awaitTermination()方法来判断线程池中是否有继续运行的线程。

 

类关系梳理

public interface Executor {
void execute(Runnable command);
}


public class Executors {
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}

// ...
}

public class ThreadPoolExecutor extends AbstractExecutorService {

public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {

this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
// ...
}

public interface ExecutorService extends Executor {
void shutdown();

<T> Future<T> submit(Callable<T> task);

// ...
}


举报

相关推荐

0 条评论