0
点赞
收藏
分享

微信扫一扫

线程池ThreadPoolExecutor

Executor框架的两级调度模型 
JAVA线程被一对一映射为本地操作系统线程。JAVA线程启动时会创建一个本 地操作系统线程,当JAVA线程终止时,对应的操作系统线程也被销毁回收,而操作系统会调度所有线程 并将它们分配给可用的CPU。 在上层,JAVA程序会将应用分解为多个任务,然后使用应用级的调度器(Executor)将这些任务映射成 固定数量的线程;在底层,操作系统内核将这些线程映射到硬件处理器上。

主要接口:

public interface Executor { void execute(Runnable command); }

public interface ExecutorService extends Executor {
    void shutdown();
    List<Runnable> shutdownNow();
    boolean isShutdown();
<T> Future<T> submit(Callable<T> task);

<T> Future<T> submit(Runnable task, T result);

Future<?> submit(Runnable task);
}

另外该接口有两个重要的实现类:

ThreadPoolExecutor与ScheduledThreadPoolExecutor。

其中ThreadPoolExecutor是线程池的核心实现类,用来执行被提交的任务;而 ScheduledThreadPoolExecutor是一个实现类,可以在给定的延迟后运行任务,或者定期执行命令

线程池其实就是生成者消费者模式。

ThreadPoolExecutor

    //主池控制状态ctl是一个原子整数
    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

    private static final int COUNT_BITS = Integer.SIZE - 3;

    private static final int COUNT_MASK = (1 << COUNT_BITS) - 1;


//线程池的工作状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

   private final BlockingQueue<Runnable> workQueue;   //阻塞队列

    private final ReentrantLock mainLock = new ReentrantLock(); //锁,保证并发安全

    private final HashSet<Worker> workers = new HashSet<>(); //工作集合


    private final Condition termination = mainLock.newCondition();  //关联队列

    private int largestPoolSize;  //最大线程数

    private long completedTaskCount;  //完成任务的数量

    private volatile long keepAliveTime;  //超出核心线程线程数的线程如果处于空闲最大的存活时间

    private volatile ThreadFactory threadFactory;  //创建线程的工厂

    private volatile RejectedExecutionHandler handler; //拒绝策略

    private volatile boolean allowCoreThreadTimeOut; //如果为false(默认),则核心线程即使在空闲时也保持活动状态。如果为true,则核心线程使用keepAliveTime超时等待工作。

    private volatile int corePoolSize; //核心线程数

    private volatile int maximumPoolSize;//最大池大小。由于工作计数实际上存储在计数位中,因此有效限制为maximumPoolSize&count_MASK。


    private static final RejectedExecutionHandler defaultHandler =
        new AbortPolicy();   //默认拒绝策略



//将线程封装成worker中,通过worker调用线程的run方法实现线程调度

private final class Worker      
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
       
       final Thread thread;

       Runnable firstTask;

      volatile long completedTasks;
    }


Execute()方法

包含两个概念字段
workerCount,指示线程的有效数量
runState,指示是否正在运行、正在关闭等
public void execute(Runnable command) {
        if (command == null)     //1.如果为空则抛出空指针异常
            throw new NullPointerException();
        int c = ctl.get();      //获取线程的状态
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))//2.如果小于核心线程数,就直接封装成worker,并调用线程的run方法
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {  //3.如果线程数大于核心线程数就尝试加入阻塞队列,如果加入队列成功
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command)) //(因为自上次检查以来,已有的已死亡)或自进入此方法后,池已关闭。再次检查(因为线程入队的过程中可能线程池停止或其它问题),如果线程池不处于运行状态,就把线程移除拒绝掉
                reject(command);
            else if (workerCountOf(recheck) == 0)//如果没有线程,则启动新线程。
                addWorker(null, false);
        }
        else if (!addWorker(command, false)) //4.如果阻塞队列也满了,尝试加入如果未超过最大线程数就加入成功,否则执行饱和的拒绝策略。
            reject(command);
    }

RejetedExecutionHandler:饱和策略 
当队列和线程池都满了,说明线程池处于饱和状态,那么必须对新提交的任务采用一种特殊的策略来进 行处理。这个策略默认配置是AbortPolicy,表示无法处理新的任务而抛出异常。JAVA提供了4中策略:

1、AbortPolicy:直接抛出异常
2、CallerRunsPolicy:只用调用所在的线程运行任务

3、DiscardOldestPolicy:丢弃队列里近的一个任务,并执行当前任务。
4、DiscardPolicy:不处理,丢弃掉。

Executors可以创建3种类型的ThreadPoolExecutor:SingleThreadExecutor、FixedThreadExecutor 和CachedThreadPool

SingleThreadExecutor

 public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
核心线程数和最大线程数一样,但是阻塞队列是无界的,所以的线程都会阻塞,资源消耗太大

FixedThreadExecutor

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

固定数量的线程,同样是核心线程数大于最大线程数。队列无界

CachedThreadPool

 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }

SynchronousQueue 也是一个队列来的,但它的特别之处在于它内部没有容器,一个生产线程,当它生产产品(即put的时 候),如果当前没有人想要消费产品(即当前没有线程执行take),此生产线程必须阻塞,等待一个消费线 程调用take操作,take操作将会唤醒该生产线程,同时消费线程会获取生产线程的产品(即数据传 递),这样的一个过程称为一次配对过程(当然也可以先take后put,原理是一样的)。

举报

相关推荐

0 条评论