0
点赞
收藏
分享

微信扫一扫

一文带你了解Java线程池(Executor)-下

好了,我们接着上一篇来讲,当然,福利也都会一样赠上

workQueue:

指被提交但未执行的任务队列,它是一个BlockingQueue接口的对象,仅用于存放Runnable对象,根据队列功能分类,在ThreadPoolExecutor的构造函数中可使用以下几种BlockingQueue:

  • 直接提交的队列:该功能由SynchronousQueue对象提供。SynchronousQueue是一个特殊的阻塞队列,它没有容量,每一个插入操作都要等待一个相应的删除操作,反之,每一个删除操作都要等待对应的插入操作。如果使用这个队列,提交的任务不会被真实的保存,而总是将新任务提交给线程执行,如果没有空闲的线程,则尝试创建新的工作线程,如果线程数量已经达到最大值(maximumPoolSize),则执行拒绝策略。因此,使用SynchronousQueue队列,通常要设置很大的maximumPoolSize值,否则很容易执行拒绝策略。

  • 有界的任务队列:有界的任务队列可以使用带有队列最大容量的ArrayBlockingQueue实现。有界队列仅在任务队列装满时,才可能将线程数提升到corePoolSize以上,换言之,除非系统非常繁忙,否则确保核心线程数维持在corePoolSize以内。

  • 无界的任务队列:无界的任务队列可以通过LinkedBlockingQueue类实现,与有界队列相比,除非系统资源耗尽,否则无界的任务队列不存在任务入队失败的情况。若任务创建速度远远大于处理速度,无界队列会快速增长,直到耗尽系统内存。

  • 优先任务队列:优先任务队列是带有执行优先级的队列。它通过PriorityBlockingQueue实现,可以控制任务的执行先后顺序。它是以特殊的无界队列。它可以根据任务自身的优先级顺序来执行任务,在确保系统性能的同时,也能有很好的质量保证(总是确保高优先级的任务先执行)

回顾newFixedThreadPool()方法的实现。它返回了一个corePoolSizemaximumPoolSize大小一样的,使用了LinkedBlockingQueue任务队列的线程池。因为固定大小的线程池不会有线程数量的动态变化。由于它使用无界队列存放无法立即执行的任务,当任务提交非常频繁时,该队列会迅速膨胀,从而耗尽系统资源。

newSingleThreadExecutor()返回的是单线程线程池,是newFixedThreadPool()方法的一种退化,只是简单的将线程池数量设为1。

newCachedThreadPool()方法返回corePoolSize是0,maximumPoolSize无穷大的线程池,这意味着在没有任务时,该线程池内无线程,而当任务被提交时,该线程池会使用空闲的线程执行任务;若无空闲线程,则将任务加入SynchronousQueue队列,而这个队列是一个直接提交的队列,它总会迫使线程池增加新的线程执行任务。当任务执行完毕后,由于corePoolSize是0,因此空闲线程又会在指定时间(60秒)被回收。

对于newCachedThreadPool(),如果同时有大量任务被提交,而任务的执行又不那么快时,那么系统便会开启等量的线程处理,这样做可能会很快耗尽系统的资源。。

这里给出ThreadPoolExecutor线程池的核心调度代码:


/*
 * 先看一下addWorker这个方法
 * @param core  true 使用 corePoolSize 作为上限, 否则使用maximumPoolSize
 * @return 成功返回true
 */
 private boolean addWorker(Runnable firstTask, boolean core);
    
 /**
     * 在未来某时运行给定的任务,任务可能在新线程中运行,也可能在线程池中的线程中运行。
     *
     * 如果一个任务不能被提交(submit),不是因为executor已经被关闭,就是已经达到了executor的maximumPoolSize,
     * 这个任务就会被当前的RejectedExecutionHandler来处理
     *
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
     
        int c = ctl.get();
        //当工作线程总数小于corePoolSizes时
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            //添加新线程失败,重新获取ctl的值
            c = ctl.get();
        }
        //进入等待队列
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //进入等待队列失败(有界队列达到上限或者使用了SynchronousQueue)
        //直接提交给线程池,如果线程数量达到maximumPoolSize,执行拒绝策略
        else if (!addWorker(command, false))
            reject(command);
    }

拒绝策略

ThreadPoolExecutor的最后一个参数指定了拒绝策略。当任务数量超过系统实际承载能力时的处理策略。内置了四种拒绝策略:

  • AbortPolicy:该策略会直接抛出异常,阻止系统正常工作

  • CallerRunsPolicy:只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务。

  • DiscardOledestPolicy策略:该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务

  • DiscardPolicy策略:该策略默默地丢弃无法处理的任务,不予任何处理。

以上内置的策略均实现了RejectedExecutionHandler接口,若以上策略仍无法满足实际应用需要,可以自己扩展RejectedExecutionHandler接口。

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

下面的代码简单地演示了自定义线程池和拒绝策略的使用:


public class RejectedThreadPoolDemo {
    private static class MyTask implements Runnable {

        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:"
             + Thread.currentThread().getId());
            try{
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10), Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {
                    @Override
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(r.toString() + " is discard");
                    }
                });
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

上述的代码定义了一个线程池。该线程池有5个常驻线程,并且最大线程数量也是5个。但它有一个只有10个容量的等待队列。因此使用无界队列很可能并不是最佳解决方案,如果任务量极大,很可能会把内存撑爆。给出一个合理的队列大小,也是合理的选择。同时,这里自定义了拒绝策略,我们不抛出异常,因为万一在任务提交端没有进行异常处理,则有可能使得整个系统都崩溃瓶,这不是我们希望遇到的,但作为必要的信息记录,我将任务丢弃的信息进行打印。

自定义线程创建:ThreadFactory用来创建线程池需要的线程:

public interface ThreadFactory {
    Thread newThread(Runnable r);
}

当线程池需要新建线程时,就会调用这个方法。自定义线程池可以跟踪何时创建了多少线程,也可以自定义线程的名称、组以及优先级等信息。下面的案例使用自定义的ThreadFactory,一方面记录了线程的创建,另一方面将所有的线程都设置为守护线程:

MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(), new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread t = new Thread(r);
                t.setDaemon(true);
                System.out.println("create " + t);
                return t;
            }
        });
        for (int i = 0; i < 5; i++) {
            es.submit(task);
        }
        Thread.sleep(2000);

扩展线程池ThreadPoolExecutor是一个可以扩展的线程池。它提供了beforeExecute()、afterExecute()、terminated()三个接口对线程池进行控制。在ThreadPoolExecutor.Worker.runTask()方法内部提供了这样的实现:

boolean ran = false;
beforeExecute(thread,task); //运行前
try{
    task.run();  //运行任务
    ran = true;
    afterExecute(task,null);  //运行结束后
    ++completedTasks;
}catch (RuntimeException ex) {
    if (!ran)
        afterExecute(task,ex);    //运行结束
    throw ex;
}

下面演示了对线程池的扩展,在这个扩展中,我们记录每一个任务的执行日志

package com.ha;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class ExtThreadPool {
    private static class MyTask implements Runnable{
        public String name;
        public MyTask(String name){
            this.name = name;
        }

        @Override
        public void run() {
            System.out.println("正在执行" + ":Thread ID:" + Thread.currentThread().getId()
            + ",Task Name=" + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()){
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:"+((MyTask)r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成:"+((MyTask)r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出");
            }
        };

        for (int i = 0; i < 5; i++) {
            MyTask task = new MyTask("TASK-"+i);
            es.execute(task);
            Thread.sleep(10);
        }

        es.shutdown();
    }
}

输出:

准备执行:TASK-0
正在执行:Thread ID:10,Task Name=TASK-0
准备执行:TASK-1
正在执行:Thread ID:11,Task Name=TASK-1
准备执行:TASK-2
正在执行:Thread ID:12,Task Name=TASK-2
准备执行:TASK-3
正在执行:Thread ID:13,Task Name=TASK-3
准备执行:TASK-4
正在执行:Thread ID:14,Task Name=TASK-4
执行完成:TASK-0
执行完成:TASK-1
执行完成:TASK-2
执行完成:TASK-3
执行完成:TASK-4
线程池退出

在线程池中寻找堆栈

首先看一个简单的案例,我们有一个Runnable接口,它用来计算两个数的商:

package com.ha;

import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class DivTask implements Runnable{
    int a,b;


    public DivTask(int a, int b) {
        this.a = a;
        this.b = b;
    }

    @Override
    public void run() {
        double re = a/b;
        System.out.println(re);
    }

    public static void main(String[] args) {
        ThreadPoolExecutor pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,
                0L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());
        for (int i = 0; i < 5; i++) {
            pools.submit(new DivTask(100,i));
        }
    }
}

输出:

100.0
50.0
33.0
25.0

从这个for循环来看,我们应该会得到5个结果,但是实际上只有4个。也就是说程序漏算了一组数据!但是没有任何日志,没有任何错误提示。在这个简单的案例中,只要仔细一点,就会发现,作为除数的i取到了0,这个缺失的值很可能是由于除以0导致的。但是在复杂的业务场景中,这种错误很难发现。

因此,使用线程池虽然是好事,但是还是得处处留意这些坑。线程池和可能会“吃”掉程序抛出的异常,导致我们对程序的错误一无所知。

那么,如何向线程池讨回异常堆栈呢?一种最简单的方法及时放弃submit(),改用execute():pools.execute(new DivTask(100,i));或者使用下的方法改造submit():

Future re = pools.submit(new DivTask(100,i));
re.get();

输出

100.0
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero50.0

    at com.ha.DivTask.run(DivTask.java:18)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)
33.0
25.0

注意了,任务的具体提交位置已经被线程池淹没了,顺着堆栈,只能找到线程池中的调度流程,而这对于我们几乎是没有价值的。

我们只好扩展ThreadPoolExecutor线程池,让它在调度任务之前,先保存一下提交任务线程的堆栈信息。如下所示:

package com.ha;

import java.util.concurrent.*;

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    private Exception clientTrace(){
        return new Exception("Client stack trace");
    }

    private Runnable wrap(final Runnable task,final Exception clientStack,
                          String clientThreadName){
        return new Runnable() {
            @Override
            public void run() {
                try{
                    task.run();
                }catch (Exception e){
                    clientStack.printStackTrace();
                    throw e;
                }
            }
        };
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task,clientTrace(),Thread.currentThread().getName()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task,clientTrace(),Thread.currentThread().getName()));
    }

    public static void main(String[] args) {
        ThreadPoolExecutor pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,
                TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        for (int i = 0; i < 5; i++) {
            pools.execute(new DivTask(100,i));
        }
    }
}

wrap()方法的第2个参数为一个异常,里面保存着提交任务的线程的堆栈信息。该方法将我们传入的Runnable任务进行一层包装,使之能处理异常信息。当任务发生异常时,这个异常会被打印。

输出:

100.0
java.lang.Exception: Client stack trace
50.0
33.0
25.0
    at com.ha.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:11)
    at com.ha.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:31)
    at com.ha.TraceThreadPoolExecutor.main(TraceThreadPoolExecutor.java:44)
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
    at com.ha.DivTask.run(DivTask.java:18)
    at com.ha.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:20)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:722)

现在,我们不仅可以得到异常发生的Runnable实现内的信息,我们也知道了这个任务是在哪里提交的。

举报

相关推荐

0 条评论