0
点赞
收藏
分享

微信扫一扫

手撕线程池 ThreadPool

worker.start(); //线程执行执行任务

workers.add(worker) ;

} else { //如果我们的任务数 > coreSize , 直接加入任务队列

taskQueue.put(task);

System.out.println(“加入任务队列 :” + task);

}

}

}

public ThreadPool(int coreSize, long timeout, TimeUnit timeUnit,int queueCapacity) {

this.coreSize = coreSize;

this.timeout = timeout;

this.timeUnit = timeUnit;

this.taskQueue = new BlockQueue<>(queueCapacity);

}

//将线程池中的线程包装为一个work类

class Worker extends Thread{

private Runnable task ;

public Worker(Runnable task) {

this.task = task;

}

@Override //执行任务

public void run() {

//task不为空,直接执行任务

//当task不为空任务队列有任务执行,队列中的任务

// while(task != null || (task = taskQueue.take()) != null) //死等,线程池中的线程一直等待获取任务队列中的任务

while(task != null || (task = taskQueue.poll(timeout,timeUnit)) != null){ //等一会儿任务队列没有就不等了,停止线程,下次啥时候来,啥时候再新建

try{

System.out.println(“正在运行任务:” + task);

task.run();

}catch (Exception e){

e.printStackTrace();

}finally {

task = null ;

}

}

synchronized (workers){

System.out.println(“线程执行任务结束,移处线程池 :” + this);

workers.remove(this) ;

}

}

}

}

/*

  • 阻塞队列

*/

class BlockQueue{

//1、任务队列

private Deque queue = new ArrayDeque<>() ;

//2、锁 : 用于锁住对头元素,防止其被多个线程同时获取!

private ReentrantLock lock = new ReentrantLock() ;

//3、生产者条件变量 (1) 任务队列满了,任务(生产者)就会进入WaitSet

private Condition fullWaitSet = lock.newCondition() ;

//4、消费者条件变量 (2) 任务队列为空,线程池中的线程(消费者)进WaitSet

private Condition emptyWaitSet = lock.newCondition() ;

//5、容量

private int capacity ;

public BlockQueue(int capacity) {

this.capacity = capacity;

}

//增强我们线程获取当前队列中的任务的方法,如果队列为空,进入休息室等待(带时限的,不会死等)

public T poll(long timeout , TimeUnit unit){

lock.lock();

try{

long nanos = unit.toNanos(timeout) ; //超时时间统一转换为纳秒

《一线大厂Java面试题解析+后端开发学习笔记+最新架构讲解视频+实战项目源码讲义》无偿开源 威信搜索公众号【编程进阶路】

while(queue.isEmpty()){ //队列为空,线程池中的线程去等待

try {

if(nanos <= 0) return null ; //如果nanos <= 0 说明超时限了,直接唤醒

nanos = emptyWaitSet.awaitNanos(nanos); //如果被提前唤醒,返回nanos的剩余时间

} catch (InterruptedException e) {

e.printStackTrace();

}

}

T task = queue.removeFirst();//队列不为空,获取并且移除队头任务

fullWaitSet.signal(); //有空闲线程,唤醒正在休息的任务

return task ; //返回取到的任务

}

finally {

lock.unlock();

}

}

//线程(消费者)获取任务队列当中的任务

public T take(){

lock.lock();

try{

while(queue.isEmpty()){ //队列为空,线程池中的线程去等待

try {

emptyWaitSet.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

T task = queue.removeFirst();//队列不为空,获取并且移除队头任务

fullWaitSet.signal(); //有空闲线程,唤醒正在休息的任务

return task ;

}

finally {

lock.unlock();

}

}

//生产者向任务队列当中提供任务

public void put(T element){

lock.lock();

try{

while(queue.size() == capacity){ //任务队列已经满了,新任务去等待

try {

fullWaitSet.await();

} catch (InterruptedException e) {

e.printStackTrace();

}

}

queue.addLast(element); //任务队列不满,添加任务到末尾

emptyWaitSet.signal(); //队列中添加一个任务,唤醒一个线程池中正在休息的线程

}finally {

lock.unlock();

}

}

//获取任务队列的大小

public int size(){

lock.lock();

try{

return queue.size(); //返回任务队列长度即可

}finally {

}

}

}

举报

相关推荐

0 条评论