文章目录
0、ThreadPoolExecutor构造函数中参数的说明
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default rejected execution handler.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* 保留在池中的线程数,即使它们是空闲的,除非设置allowCoreThreadTimeOut参数
* @param maximumPoolSize the maximum number of threads to allow in the
* pool 池中允许的最大线程数
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* 当线程数大于corePoolSize时,这是多余的空闲线程,在终止前等待新任务的最长时间
* @param unit the time unit for the {@code keepAliveTime} argument
* keepAliveTime 参数的时间单位
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* 执行前用于保存任务的队列。该队列将仅保存 execute方法提交的 Runnable任务
* @param threadFactory the factory to use when the executor
* creates a new thread
* 执行器创建新线程时使用的工厂
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
看完上面的每一个参数的解释,记一下,后面会用到,当然也可以再回来看
1、java提供的四种线程池如何使用
先准备一个获得线程的工厂类,给四种线程池初始化用,
当然也可以不指定,这里主要是为了区分线程池中每一个线程
class threadPoolFactory implements ThreadFactory {
private AtomicInteger threadIdx = new AtomicInteger(0);
private String threadNamePrefix;
public threadPoolFactory(String Prefix) {
threadNamePrefix = Prefix;
}
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
System.out.println("创建了一个新线程:"+threadNamePrefix+":"+threadIdx);
thread.setName(threadNamePrefix + "--" + threadIdx.getAndIncrement());
return thread;
}
}
(1)newCachedThreadPool(无核心线程,空闲线程60s销毁,无上限数量)
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool(new threadPoolFactory("cachedThread"));
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName());
}
);
}
}
/**
* Creates a thread pool that creates new threads as needed, but
* will reuse previously constructed threads when they are
* available, and uses the provided
* ThreadFactory to create new threads when needed.
* 创建一个线程池,根据需要创建新线程,但会在之前构建的线程可用时重用它们,
* 并在需要时使用提供的 ThreadFactory 来创建新线程
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
}
结果图
这里需要注意的是所有的输出都结束了,任务也没有停止,直到1分钟任务才停止,正好和ThreadPoolExecutor
构造参数的corePoolSize
、keepAliveTime
、unit
对应上,这里没测最大线程数,看构造参数中maximumPoolSize=Integer.MAX_VALUE
,所以实际项目上不建议用这个,可能会出现oom
的情况
(2)newFixedThreadPool(指定核心线程数)
public static void main(String[] args) {
ExecutorService executorService = Executors.newFixedThreadPool(5, new threadPoolFactory("fixedThreadPool"));
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName());
}
);
}
}
/**
* Creates a thread pool that reuses a fixed number of threads
* operating off a shared unbounded queue, using the provided
* ThreadFactory to create new threads when needed. At any point,
* at most {@code nThreads} threads will be active processing
* tasks. If additional tasks are submitted when all threads are
* active, they will wait in the queue until a thread is
* available. If any thread terminates due to a failure during
* execution prior to shutdown, a new one will take its place if
* needed to execute subsequent tasks. The threads in the pool will
* exist until it is explicitly {@link ExecutorService#shutdown
* shutdown}.
*建一个线程池,该线程池重用固定数量的线程操作共享无界队列,使用提供的 ThreadFactory 在需要时创建新线程。
在任何时候,至多nThreads 线程将主动处理任务。
如果在所有线程都处于活动状态时提交了额外的任务,它们将在队列中等待,直到有线程可用。
如果任何线程在关机前的执行过程中因失败而终止,如果需要执行后续任务,则新的线程将取代它。
池中的线程将存在,直到它明确地执行 ExecutorService接口中的shutdown
* @param nThreads the number of threads in the pool
* @param threadFactory the factory to use when creating new threads
* @return the newly created thread pool
* @throws NullPointerException if threadFactory is null
* @throws IllegalArgumentException if {@code nThreads <= 0}
*/
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
}
结果图
这里需要注意的是任务不会自己停止,除非你加了 executorService.shutdown();
,根据ThreadPoolExecutor的参数,核心线程数和最大线程数都是一个,再多的任务都是这这些线程跑。多了就只能等待,
(3)newSingleThreadExecutor(单线程的线程池)
public static void main(String[] args) {
// 创建仅有单个线程的线程池
ExecutorService executorService = Executors.newSingleThreadExecutor(new threadPoolFactory("singleThreadPool"));
for (int i = 0; i < 10; i++) {
executorService.submit(() -> {
System.out.println(Thread.currentThread().getName());
}
);
}
}
/**
* Creates an Executor that uses a single worker thread operating
* off an unbounded queue, and uses the provided ThreadFactory to
* create a new thread when needed. Unlike the otherwise
* equivalent {@code newFixedThreadPool(1, threadFactory)} the
* returned executor is guaranteed not to be reconfigurable to use
* additional threads.
*创建一个 Executor,它使用单个工作线程操作无界队列,
并使用提供的 ThreadFactory在需要时创建一个新线程。
与其他等价的newFixedThreadPool(1, threadFactory)不同,
返回的执行程序保证不可重新配置以使用附加线程
* @param threadFactory the factory to use when creating new
* threads
*
* @return the newly created single-threaded Executor
* @throws NullPointerException if threadFactory is null
*/
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
结果图
这里就是单线程的跑,并且不会运行完停止,
又和newFixedThreadPool(1, threadFactory)
有区别,因为有一层new FinalizableDelegatedExecutorService
/**
* A wrapper class that exposes only the ExecutorService methods
* of an ExecutorService implementation.
* 仅公开 ExecutorService 实现的 ExecutorService 方法的包装类
*/
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
//FinalizableDelegatedExecutorService继承了DelegatedExecutorService
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();
}
}
实际上FinalizableDelegatedExecutorService这个类就是对ExecutorService进行了一个包装,防止暴露出不该被暴露的方法,然后加上了finalize方法保证线程池的关闭。
(4) newScheduledThreadPool(核心线程数指定,可以周期性执行)
0)newScheduledThreadPool构造方法
/**
* Creates a thread pool that can schedule commands to run after a
* given delay, or to execute periodically.
* @param corePoolSize the number of threads to keep in the pool,
* even if they are idle
* @param threadFactory the factory to use when the executor
* creates a new thread
* @return a newly created scheduled thread pool
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if threadFactory is null
*/
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
/**
* Creates a new {@code ScheduledThreadPoolExecutor} with the
* given initial parameters.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if {@code corePoolSize < 0}
* @throws NullPointerException if {@code threadFactory} is null
*/
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue(), threadFactory);
}
super也就是下面这个
/**
* Creates a new {@code ThreadPoolExecutor} with the given initial
* parameters and default rejected execution handler.
*
* @param corePoolSize the number of threads to keep in the pool, even
* if they are idle, unless {@code allowCoreThreadTimeOut} is set
* @param maximumPoolSize the maximum number of threads to allow in the
* pool
* @param keepAliveTime when the number of threads is greater than
* the core, this is the maximum time that excess idle threads
* will wait for new tasks before terminating.
* @param unit the time unit for the {@code keepAliveTime} argument
* @param workQueue the queue to use for holding tasks before they are
* executed. This queue will hold only the {@code Runnable}
* tasks submitted by the {@code execute} method.
* @param threadFactory the factory to use when the executor
* creates a new thread
* @throws IllegalArgumentException if one of the following holds:<br>
* {@code corePoolSize < 0}<br>
* {@code keepAliveTime < 0}<br>
* {@code maximumPoolSize <= 0}<br>
* {@code maximumPoolSize < corePoolSize}
* @throws NullPointerException if {@code workQueue}
* or {@code threadFactory} is null
*/
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
所以最后还是ThreadPoolExecutor
再看下面这个
/**
* A wrapper class that exposes only the ScheduledExecutorService
* methods of a ScheduledExecutorService implementation.
*/
static class DelegatedScheduledExecutorService
extends DelegatedExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService e;
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return e.schedule(command, delay, unit);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return e.schedule(callable, delay, unit);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return e.scheduleAtFixedRate(command, initialDelay, period, unit);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}
DelegatedScheduledExecutorService
和FinalizableDelegatedExecutorService
是兄弟关系,都是再次封装了DelegatedExecutorService
,并且额外实现了ScheduledExecutorService
接口用于周期性执行任务
1) 延迟一段时间后执行
public static void main(String[] args) {
// 创建指定核心线程数5
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5, new threadPoolFactory("scheduledThread"));
// 定时执行一次的任务,延迟1s后执行
executorService.schedule(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ", delay 1s");
}
}, 1, TimeUnit.SECONDS);
}
结果图
因为这里核心线程数是5,所以任务没有停止,如果设置成0,线程执行完成后就会销毁,任务停止
2)延迟两秒后,三秒一个周期执行一次
public static void main(String[] args) {
// 创建指定核心线程数5
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5, new threadPoolFactory("scheduledThread"));
// 周期性地执行任务,延迟2s后,每3s一次地周期性执行任务
executorService.scheduleAtFixedRate(new Runnable() {
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ", every 3s");
}
}, 2, 3, TimeUnit.SECONDS);
}
结果图
这里会有一个问题?
虽然我指定了核心线程数为5,但是根据构造方法得到空闲线程数应该是Integer.MAX_VALUE
,这里为什么不额外创建新线程呢?
看下面的测试用例1(设置2个核心线程,跑10个周期任务)
public static void main(String[] args) {
// 创建指定核心线程数2
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2, new threadPoolFactory("scheduledThread"));
// 10个周期性地执行任务,延迟2s后,每1毫秒一次地周期性执行任务
for (int i = 0; i < 10; i++) {
executorService.scheduleAtFixedRate(new Runnable() {
@SneakyThrows
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ", every 3s");
//休眠此线程
Thread.sleep(20000);
}
}, 2, 1, TimeUnit.MILLISECONDS);
}
}
结果图
你会发现线程卡在这里了,相当于两个核心线程休眠了,就算有新的任务也不会创建新的线程去执行新的周期任务,
下面看测试用例2(0个核心线程数,跑10个任务)
public static void main(String[] args) {
// 创建指定核心线程数0
ScheduledExecutorService executorService = Executors.newScheduledThreadPool(0, new threadPoolFactory("scheduledThread"));
// 周期性地执行任务,延迟2s后,每3s一次地周期性执行任务
for (int i = 0; i < 10; i++) {
executorService.scheduleAtFixedRate(new Runnable() {
@SneakyThrows
@Override
public void run() {
System.out.println(Thread.currentThread().getName() + ", every 3s");
Thread.sleep(20000);
}
}, 2, 1, TimeUnit.MILLISECONDS);
}
}
结果图
这里线程池只创建了一个线程去跑这个任务,所以设置核心线程数等于0,那相当于允许让线程池创建一个线程,并且还是用完就销毁的线程
(5)四种java提供的线程池比较
构造方法 | corePoolSize | maximumPoolSize | keepAliveTime | workQueue |
---|---|---|---|---|
newCachedThreadPool | 0 | Integer.MAX_VALUE | 60s | SynchronousQueue |
newFixedThreadPool | 用户指定 | 和corePoolSize数值一致 | 0 | LinkedBlockingQueue |
newSingleThreadExecutor | 1 | 1 | 0 | LinkedBlockingQueue |
newScheduledThreadPool | 用户指定 | Integer.MAX_VALUE(虽然入参是这个,但是如果corePoolSize=0,则这个实际是1;如果corePoolSize>0,则这个实际是0) | 0 | DelayedWorkQueue |
后面研究一下这些队列
3、自定义创建线程池
看完上面的其实也就知道如何自己创建线程池了
// 自定义创建线程池
ExecutorService executorService =new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new threadPoolFactory("cachedThread"));
使用方法和上面四种线程池是一样的