0
点赞
收藏
分享

微信扫一扫

多线程编程

认真的老去 2022-05-02 阅读 51

Callable jdk8api

在这里可以看出几个关键点:

1、可以又返回值 2、可以抛出异常 3、方法不同,run()/ call()

import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.FutureTask;

public class CallableTest {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //new Thread(Runnable()).start();
        //new Thread(new FutureTask<V>()).start();
        //new Thread(new FutureTask<V>(Callable)).start();
        MyThread thread = new MyThread();
        // 适配类
        FutureTask<Integer> futureTask = new FutureTask<>(thread);
        new Thread(futureTask,"A").start();
        new Thread(futureTask,"B").start(); //结果会缓存 提高效率
        // 获取 callable 的返回结果 这个get方法会产生阻塞 把他放到最后
        // 或者使用异步通信来处理
        Integer integer = futureTask.get();
        System.out.println(integer);
    }
}

class MyThread implements Callable<Integer> {
    @Override
    public Integer call(){
        System.out.println("call()");
        return 111;
    }
}

1.又缓存 2结果可能需要等待,会阻塞

常用辅助类

1.CountDownLatch

api中是这样介绍的:

  • 允许一个或多个线程等待直到在其他线程中执行的一组操作完成的同步辅助。

    A CountDownLatch用给定的计数初始化。 await方法阻塞,直到由于countDown()方法的调用而导致当前计数达到零,之后所有等待线程被释放,并且任何后续的await 调用立即返回。 这是一个一次性的现象 - 计数无法重置。 如果您需要重置计数的版本,请考虑使用CyclicBarrier

    A CountDownLatch是一种通用的同步工具,可用于多种用途。 一个CountDownLatch为一个计数的CountDownLatch用作一个简单的开/关锁存器,或者门:所有线程调用await在门口等待,直到被调用countDown()的线程打开。 一个CountDownLatch初始化N可以用来做一个线程等待,直到N个线程完成某项操作,或某些动作已经完成N次。

    CountDownLatch一个有用的属性是,它不要求调用countDown线程等待计数到达零之前继续,它只是阻止任何线程通过await ,直到所有线程可以通过。

    示例用法:这是一组类,其中一组工作线程使用两个倒计时锁存器:

    • 第一个是启动信号,防止任何工作人员进入,直到驾驶员准备好继续前进;
    • 第二个是完成信号,允许司机等到所有的工作人员完成。
       class Driver { // ... void main() throws InterruptedException { CountDownLatch startSignal = new CountDownLatch(1); CountDownLatch doneSignal = new CountDownLatch(N); for (int i = 0; i < N; ++i) // create and start threads new Thread(new Worker(startSignal, doneSignal)).start(); doSomethingElse(); // don't let run yet startSignal.countDown(); // let all threads proceed doSomethingElse(); doneSignal.await(); // wait for all to finish } } class Worker implements Runnable { private final CountDownLatch startSignal; private final CountDownLatch doneSignal; Worker(CountDownLatch startSignal, CountDownLatch doneSignal) { this.startSignal = startSignal; this.doneSignal = doneSignal; } public void run() { try { startSignal.await(); doWork(); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } } 

    另一个典型的用法是将问题划分为N个部分,用一个Runnable来描述每个部分,该Runnable执行该部分并在锁存器上倒计时,并将所有Runnables排队到执行器。 当所有子部分完成时,协调线程将能够通过等待。 (当线程必须以这种方式反复倒数时,请改用CyclicBarrier ))

       class Driver2 { // ... void main() throws InterruptedException { CountDownLatch doneSignal = new CountDownLatch(N); Executor e = ... for (int i = 0; i < N; ++i) // create and start threads e.execute(new WorkerRunnable(doneSignal, i)); doneSignal.await(); // wait for all to finish } } class WorkerRunnable implements Runnable { private final CountDownLatch doneSignal; private final int i; WorkerRunnable(CountDownLatch doneSignal, int i) { this.doneSignal = doneSignal; this.i = i; } public void run() { try { doWork(i); doneSignal.countDown(); } catch (InterruptedException ex) {} // return; } void doWork() { ... } } 

    内存一致性效果:直到计数调用之前达到零,在一个线程操作countDown() happen-before以下由相应的成功返回行动await()在另一个线程。

 代码实现

import java.util.concurrent.CountDownLatch;

// 计数器
public class CountDownLatchTest {
    public static void main(String[] args) throws InterruptedException {
        // 总数是6 必须要执行任务的时候,再使用
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName()+" out");
                countDownLatch.countDown(); // 数量减一
            },String.valueOf(i)).start();
        }
        // -1
        countDownLatch.await(); // 等待计数器归零然后向下执行
        System.out.println("close");

    }
}

可以看出他执行的是减法操作 

2.CyclicBarrier

api:

  • 允许一组线程全部等待彼此达到共同屏障点的同步辅助。 循环阻塞在涉及固定大小的线程方的程序中很有用,这些线程必须偶尔等待彼此。 屏障被称为循环 ,因为它可以在等待的线程被释放之后重新使用。

    A CyclicBarrier支持一个可选的Runnable命令,每个屏障点运行一次,在派对中的最后一个线程到达之后,但在任何线程释放之前。 在任何一方继续进行之前,此屏障操作对更新共享状态很有用。

    示例用法:以下是在并行分解设计中使用障碍的示例:

       class Solver { final int N; final float[][] data; final CyclicBarrier barrier; class Worker implements Runnable { int myRow; Worker(int row) { myRow = row; } public void run() { while (!done()) { processRow(myRow); try { barrier.await(); } catch (InterruptedException ex) { return; } catch (BrokenBarrierException ex) { return; } } } } public Solver(float[][] matrix) { data = matrix; N = matrix.length; Runnable barrierAction = new Runnable() { public void run() { mergeRows(...); }}; barrier = new CyclicBarrier(N, barrierAction); List<Thread> threads = new ArrayList<Thread>(N); for (int i = 0; i < N; i++) { Thread thread = new Thread(new Worker(i)); threads.add(thread); thread.start(); } // wait until done for (Thread thread : threads) thread.join(); } } 
    这里,每个工作线程处理矩阵的一行,然后等待屏障,直到所有行都被处理。 当处理所有行时,执行提供的Runnable屏障操作并合并行。 如果合并确定已经找到解决方案,那么done()将返回true ,并且每个工作人员将终止。

    如果屏障操作不依赖于执行方暂停的各方,那么该方可以在释放任何线程时执行该操作。 为了方便这一点,每次调用await()返回该线程在屏障上的到达索引。 然后,您可以选择哪个线程应该执行屏障操作,例如:

       if (barrier.await() == 0) { // log the completion of this iteration } 

    CyclicBarrier对失败的同步尝试使用all-or-none断裂模型:如果线程由于中断,故障或超时而过早离开障碍点,那么在该障碍点等待的所有其他线程也将通过BrokenBarrierException (或InterruptedException)异常离开如果他们也在同一时间被打断)。

    内存一致性效果:线程中调用的行动之前, await() happen-before行动是屏障操作的一部分,进而发生,之前的动作之后,从相应的成功返回await()其他线程。

  • 代码:

  • import java.util.concurrent.BrokenBarrierException;
    import java.util.concurrent.CyclicBarrier;
    
    public class CyclicBarrierTest {
        public static void main(String[] args) {
            CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> {
                System.out.println("召唤神龙");
            });
            for (int i = 1; i <= 7; i++) {
                final int temp = i;
                new Thread(()->{
                    System.out.println(Thread.currentThread().getName()+"收集了"+temp+"个龙珠");
                    try {
                        cyclicBarrier.await(); //等待
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } catch (BrokenBarrierException e) {
                        e.printStackTrace();
                    }
                }).start();
            }
        }
    }

3.Semaphore

api:

  • 一个计数信号量。 在概念上,信号量维持一组许可证。 如果有必要,每个acquire()都会阻塞,直到许可证可用,然后才能使用它。 每个release()添加许可证,潜在地释放阻塞获取方。 但是,没有使用实际的许可证对象; Semaphore只保留可用数量的计数,并相应地执行。

    信号量通常用于限制线程数,而不是访问某些(物理或逻辑)资源。 例如,这是一个使用信号量来控制对一个项目池的访问的类:

       class Pool { private static final int MAX_AVAILABLE = 100; private final Semaphore available = new Semaphore(MAX_AVAILABLE, true); public Object getItem() throws InterruptedException { available.acquire(); return getNextAvailableItem(); } public void putItem(Object x) { if (markAsUnused(x)) available.release(); } // Not a particularly efficient data structure; just for demo protected Object[] items = ... whatever kinds of items being managed protected boolean[] used = new boolean[MAX_AVAILABLE]; protected synchronized Object getNextAvailableItem() { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } return null; // not reached } protected synchronized boolean markAsUnused(Object item) { for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; return true; } else return false; } } return false; } } 

    在获得项目之前,每个线程必须从信号量获取许可证,以确保某个项目可用。 当线程完成该项目后,它将返回到池中,并将许可证返回到信号量,允许另一个线程获取该项目。 请注意,当调用acquire()时,不会保持同步锁定,因为这将阻止某个项目返回到池中。 信号量封装了限制对池的访问所需的同步,与保持池本身一致性所需的任何同步分开。

    信号量被初始化为一个,并且被使用,使得它只有至多一个允许可用,可以用作互斥锁。 这通常被称为二进制信号量 ,因为它只有两个状态:一个许可证可用,或零个许可证可用。 当以这种方式使用时,二进制信号量具有属性(与许多Lock实现不同),“锁”可以由除所有者之外的线程释放(因为信号量没有所有权概念)。 这在某些专门的上下文中是有用的,例如死锁恢复。

    此类的构造函数可选择接受公平参数。 当设置为false时,此类不会保证线程获取许可的顺序。 特别是, 闯入是允许的,也就是说,一个线程调用acquire()可以提前已经等待线程分配的许可证-在等待线程队列的头部逻辑新的线程将自己。 当公平设置为真时,信号量保证调用acquire方法的线程被选择以按照它们调用这些方法的顺序获得许可(先进先出; FIFO)。 请注意,FIFO排序必须适用于这些方法中的特定内部执行点。 因此,一个线程可以在另一个线程之前调用acquire ,但是在另一个线程之后到达排序点,并且类似地从方法返回。 另请注意, 未定义的tryAcquire方法不符合公平性设置,但将采取任何可用的许可证。

    通常,用于控制资源访问的信号量应该被公平地初始化,以确保线程没有被访问资源。 当使用信号量进行其他类型的同步控制时,非正常排序的吞吐量优势往往超过公平性。

    本课程还提供了方便的方法, 一次acquirerelease多个许可证。 当没有公平地使用这些方法时,请注意增加无限期延期的风险。

    内存一致性效应:在另一个线程中成功执行“获取”方法(如acquire()之前,调用“释放”方法之前的线程中的操作,例如release() happen-before

  • 代码:

    import java.util.concurrent.Semaphore;
    import java.util.concurrent.TimeUnit;
    
    public class SemaphoreTest {
        public static void main(String[] args) {
            // 线程数量 限流
            Semaphore semaphore = new Semaphore(3);
            for (int i = 1; i <= 6; i++) {
                new Thread(()->{
                    try {
                        // acquire() 得到
                        semaphore.acquire();
                        System.out.println(Thread.currentThread().getName()+"得到车位");
                        TimeUnit.SECONDS.sleep(1);
                        System.out.println(Thread.currentThread().getName()+"离开车位");
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }finally {
                        // release() 释放
                        semaphore.release();
                    }
                },String.valueOf(i)).start();
            }
        }
    }
    

    semaphore.acquire(); 获得,假设如果已经满了,等待,等待被释放为止semaphore.release();释放,会将当前的信号量释放 + 1,然后唤醒等待的线程

  • 作用:多个共享资源互斥的使用 开发限流,控制最大线程数

  • 读写锁

 代码:

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

/**
 * 独占锁(写锁) 一次只能被一个线程占有
 * 共享锁(读锁) 多个线程可以同时占有
 * ReadWriteLock 读写锁
 *  读-读 可以共存
 *  读-写 不能共存
 *  写-写 不能共存
 */
public class ReadWriteLockTest {
    public static void main(String[] args) {
       // MyCache myCache = new MyCache();
        MyCacheLock myCache = new MyCacheLock();
        //多个线程写入操作
        for (int i = 1; i <= 5; i++) {
            final int tem = i;
            new Thread(() -> {
                myCache.put(tem + "", tem + "");
            }, String.valueOf(i)).start();
        }
        // 多个线程读取操作
        for (int i = 1; i <= 5; i++) {
            final int tem = i;
            new Thread(() -> {
                myCache.get(tem + "");
            }, String.valueOf(i)).start();
        }

    }
}

/**
 * 自定义缓存 加锁
 */
class MyCacheLock {
    private volatile Map<String, Object> map = new HashMap<>();
    //读写锁 ,更加细粒度的控制
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    // 存,写的时候,只希望同时只有一个线程写
    public void put(String key, Object value) {
        // 加锁
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "写入中" + key);
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "写入成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 解锁
            readWriteLock.writeLock().unlock();
        }

    }

    // 取,读
    public void get(String key) {
        readWriteLock.readLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "读取中" + key);
            Object objs = map.get(key);
            System.out.println(Thread.currentThread().getName() + "读取中成功");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            readWriteLock.readLock().unlock();
        }
    }
}

/**
 * 自定义缓存
 */
class MyCache {
    private volatile Map<String, Object> map = new HashMap<>();

    // 存,写
    public void put(String key, Object value) {
        System.out.println(Thread.currentThread().getName() + "写入中" + key);
        map.put(key, value);
        System.out.println(Thread.currentThread().getName() + "写入成功");
    }

    // 取,读
    public void get(String key) {
        System.out.println(Thread.currentThread().getName() + "读取中" + key);
        map.get(key);
        System.out.println(Thread.currentThread().getName() + "读取中成功");
    }
}

阻塞队列

不得不阻塞:

写入的时候队列满了,就必须阻塞等待 ,取的时候如果队列是空的,必须阻塞等待生产

阻塞队列:

4组api

方式抛出异常不会抛出异常,有返回值阻塞等待超时等待
添加addofferputoffer(,大小,单位)
移除removepolltakepoll(大小,单位)
检测队首元素elementpeek

 /**
     * 抛出异常
     */
    public static void test() {
        // 队列大小
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.add("a"));
        System.out.println(blockingQueue.add("b"));
        System.out.println(blockingQueue.add("c"));
        //抛出异常 IllegalStateException: Queue full 队列已满
//        System.out.println(blockingQueue.add("d"));
        System.out.println("====================");
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        System.out.println(blockingQueue.remove());
        //NoSuchElementException 没有元素
//        System.out.println(blockingQueue.remove());
    }
    /**
     * 有返回值,没有异常
     */
    public static void test2(){
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
        System.out.println(blockingQueue.offer("a"));
        System.out.println(blockingQueue.offer("b"));
        System.out.println(blockingQueue.offer("c"));
        //System.out.println(blockingQueue.offer("d"));// 不抛异常
        System.out.println(blockingQueue.peek()); // 查看队首是谁
        System.out.println("=======================");
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
    }

    /**
     *  等待,阻塞(一直阻塞)
     */
    public static void test3() throws InterruptedException {
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);
        //一直阻塞
        blockingQueue.put("a");
        blockingQueue.put("b");
        blockingQueue.put("c");
        // blockingQueue.put("d"); // 队列没有位置了,一直阻塞
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take()); // 没有这个元素,一直阻塞
    }

    /**
     * 等待,阻塞(超时等待)
     */
    public static void test4() throws InterruptedException {
        //队列大小
        ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue<>(3);

        blockingQueue.offer("a");
        blockingQueue.offer("b");
        blockingQueue.offer("c");
        //blockingQueue.offer("d",2, TimeUnit.SECONDS); // 等待超过2秒就退出
        System.out.println("===================");
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll());
        System.out.println(blockingQueue.poll(2,TimeUnit.SECONDS)); // 等待超过2秒就退出
    }

SynchronousQueue 同步队列

没有容量,进去一个元素,必须等待取出来之后,才能再往里面放一个元素!

put方法存,take方法取

代码:

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * 同步队列
 * 和其他的 BlockingQueue 不一样 ,SynchronousQueue不存储元素
 * put 了一个元素,必须从里面take取出来,否则不能再put进去值
 */
public class SynchronizedQueueTest {
    public static void main(String[] args) {
        BlockingQueue<String> blockingQueue = new SynchronousQueue<String>();// 同步队列

        new Thread(() -> {
            try {
                System.out.println(Thread.currentThread().getName()+" put 1");
                blockingQueue.put("1");
                System.out.println(Thread.currentThread().getName()+" put 2");
                blockingQueue.put("2");
                System.out.println(Thread.currentThread().getName()+" put 3");
                blockingQueue.put("3");
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "T1").start();
        new Thread(() -> {
            try {
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
                TimeUnit.SECONDS.sleep(2);
                System.out.println(Thread.currentThread().getName()+"=>"+blockingQueue.take());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }, "T2").start();
    }
}

线程池

池化技术
程序的运行,本质:占用系统的资源!优化资源的使用!=>池化技术线程池、连接池、内存池、对象池.....创建、销毁。十分浪费资源
池化技术∶事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我。
线程池的好处:
1、降低资源的消耗

2、提高响应的速度

3.方便管理.

线程复用、可以控制最大并发数、管理线程

线程池:

三大方法

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// Executors 工具类,3大方法
public class PoolTest {
    public static void main(String[] args) {
        // ExecutorService threadPool = Executors.newSingleThreadExecutor();// 单个线程
        // ExecutorService threadPool = Executors.newFixedThreadPool(5);// 创建一个固定的线程池大小
        ExecutorService threadPool = Executors.newCachedThreadPool();// 可伸缩的,遇强则强,遇弱则弱

        try {
            for (int i = 0; i < 10; i++) {
                // 使用了线程池之后,使用线程池来创建线程
                threadPool.execute(()->{
                    System.out.println(Thread.currentThread().getName()+" 条线程");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 线程池用完,程序结束,关闭线程池
            threadPool.shutdown();
        }
    }
}

7大参数: 

三大方法的源码

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

本质都是调用的都是ThreadPoolExecutor()

ThreadPoolExecutor的源码:

    public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小
                              int maximumPoolSize, // 最大核心线程池大小
                              long keepAliveTime, // 超时了没有人调用会被释放
                              TimeUnit unit,  // 超时单位
                              BlockingQueue<Runnable> workQueue, // 阻塞队列
                             ThreadFactory threadFactory, // 线程工厂,创建线程的,一般不用动
                              RejectedExecutionHandler handler) { //拒绝策略
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

会发现这个方法里面有7个参数: 

                             int corePoolSize, // 核心线程池大小
                              int maximumPoolSize, // 最大核心线程池大小
                              long keepAliveTime, // 超时了没有人调用会被释放
                              TimeUnit unit,  // 超时单位
                              BlockingQueue<Runnable> workQueue, // 阻塞队列
                             ThreadFactory threadFactory, // 线程工厂,创建线程的,一般不用动
                              RejectedExecutionHandler handler) //拒绝策略

阿里巴巴手册中

 

四种拒绝策略:

 

 

import java.util.concurrent.*;

/** 四大拒绝策略
 *  1.队列满了,还有线程进,则不处理这个线程,抛出异常: ThreadPoolExecutor.AbortPolicy
 *  2.哪来的回哪 ThreadPoolExecutor.CallerRunsPolicy
 *  3.队列满了,丢掉任务,不会抛出异常 ThreadPoolExecutor.DiscardPolicy
 *  4.队列满了,尝试去和最早的竞争,如果竞争失败,也不会抛出异常 ThreadPoolExecutor.DiscardOldestPolicy
 */
public class PoolDemo {
    public static void main(String[] args) {
        // 自定义线程池
        ExecutorService threadPool = new ThreadPoolExecutor(2,
                5, 2,
                TimeUnit.SECONDS,new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                // AbortPolicy 的策略 队列满了,还有线程进,则不处理这个线程,抛出异常
                new ThreadPoolExecutor.DiscardOldestPolicy());

        try {
            // 最大承载: 队列 + max (LinkedBlockingDeque + maximumPoolSize)
            for (int i = 0; i < 10; i++) {
                // 使用了线程池之后,使用线程池来创建线程
                threadPool.execute(() -> {
                    System.out.println(Thread.currentThread().getName() + " 条线程");
                });
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            // 线程池用完,程序结束,关闭线程池
            threadPool.shutdown();
        }
    }

}

  四大拒绝策略
 1.队列满了,还有线程进,则不处理这个线程,抛出异常: ThreadPoolExecutor.AbortPolicy
  2.哪来的回哪 ThreadPoolExecutor.CallerRunsPolicy
  3.队列满了,丢掉任务,不会抛出异常 ThreadPoolExecutor.DiscardPolicy
4.队列满了,尝试去和最早的竞争,如果竞争失败,也不会抛出异常 ThreadPoolExecutor.DiscardOldestPolicy

最大线程到底该怎么定义(池的最大的大小如何去设置)
CPU密集型:
CPU 密集型,几核,就是几。可以保持CPU的效率最高!
 // 获取CPU的核数
        System.out.println(Runtime.getRuntime().availableProcessors());
        // 自定义线程池
        ExecutorService threadPool = new ThreadPoolExecutor(2,
                Runtime.getRuntime().availableProcessors(), 2,
                TimeUnit.SECONDS,new LinkedBlockingDeque<>(3),
                Executors.defaultThreadFactory(),
                // AbortPolicy 的策略 队列满了,还有线程进,则不处理这个线程,抛出异常
                new ThreadPoolExecutor.DiscardOldestPolicy());

IO密集型 :

最大线程数 > 判断你程序中十分耗IO的线程  IO十分占用资源

推荐视频:【狂神说Java】JUC并发编程最新版通俗易懂_哔哩哔哩_bilibili

举报

相关推荐

0 条评论