worker.start(); //线程执行执行任务
workers.add(worker) ;
} else { //如果我们的任务数 > coreSize , 直接加入任务队列
taskQueue.put(task);
System.out.println(“加入任务队列 :” + task);
}
}
}
public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity) {
this.coreSize = coreSize;
this.timeout = timeout;
this.timeUnit = timeUnit;
this.taskQueue = new BlockQueue<>(queueCapacity);
}
//将线程池中的线程包装为一个work类
class Worker extends Thread{
private Runnable task ;
public Worker(Runnable task) {
this.task = task;
}
@Override //执行任务
public void run() {
//task不为空,直接执行任务
//当task不为空任务队列有任务执行,队列中的任务
// while(task != null || (task = taskQueue.take()) != null) //死等,线程池中的线程一直等待获取任务队列中的任务
while(task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){ //等一会儿任务队列没有就不等了,停止线程,下次啥时候来,啥时候再新建
try{
System.out.println(“正在运行任务:” + task);
task.run();
}catch (Exception e){
e.printStackTrace();
}finally {
task = null ;
}
}
synchronized (workers){
System.out.println(“线程执行任务结束,移处线程池 :” + this);
workers.remove(this) ;
}
}
}
}
/*
- 阻塞队列
*/
class BlockQueue{
//1、任务队列
private Deque queue = new ArrayDeque<>() ;
//2、锁 : 用于锁住对头元素,防止其被多个线程同时获取!
private ReentrantLock lock = new ReentrantLock() ;
//3、生产者条件变量 (1) 任务队列满了,任务(生产者)就会进入WaitSet
private Condition fullWaitSet = lock.newCondition() ;
//4、消费者条件变量 (2) 任务队列为空,线程池中的线程(消费者)进WaitSet
private Condition emptyWaitSet = lock.newCondition() ;
//5、容量
private int capacity ;
public BlockQueue(int capacity) {
this.capacity = capacity;
}
//增强我们线程获取当前队列中的任务的方法,如果队列为空,进入休息室等待(带时限的,不会死等)
public T poll(long timeout , TimeUnit unit){
lock.lock();
try{
long nanos = unit.toNanos(timeout) ; //超时时间统一转换为纳秒
《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】
while(queue.isEmpty()){ //队列为空,线程池中的线程去等待
try {
if(nanos <= 0) return null ; //如果nanos <= 0 说明超时限了,直接唤醒
nanos = emptyWaitSet.awaitNanos(nanos); //如果被提前唤醒,返回nanos的剩余时间
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();//队列不为空,获取并且移除队头任务
fullWaitSet.signal(); //有空闲线程,唤醒正在休息的任务
return task ; //返回取到的任务
}
finally {
lock.unlock();
}
}
//线程(消费者)获取任务队列当中的任务
public T take(){
lock.lock();
try{
while(queue.isEmpty()){ //队列为空,线程池中的线程去等待
try {
emptyWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
T task = queue.removeFirst();//队列不为空,获取并且移除队头任务
fullWaitSet.signal(); //有空闲线程,唤醒正在休息的任务
return task ;
}
finally {
lock.unlock();
}
}
//生产者向任务队列当中提供任务
public void put(T element){
lock.lock();
try{
while(queue.size() == capacity){ //任务队列已经满了,新任务去等待
try {
fullWaitSet.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
queue.addLast(element); //任务队列不满,添加任务到末尾
emptyWaitSet.signal(); //队列中添加一个任务,唤醒一个线程池中正在休息的线程
}finally {
lock.unlock();
}
}
//获取任务队列的大小
public int size(){
lock.lock();
try{
return queue.size(); //返回任务队列长度即可
}finally {
}
}
}