0
点赞
收藏
分享

微信扫一扫

JUC学习笔记

梅梅的时光 2022-04-19 阅读 58
java

1、关于 wait()方法虚假唤醒的问题

wait()方法一个特点就是在哪里沉睡就会在哪里醒来,所以,如果wait()方法没加在循环里,就会出现一个只有第一次判断生效,第二次则不进行判断,直接往下执行造成虚假唤醒的情况,从而导致数据出错。下图中,把 if改为 while解决虚假唤醒问题

image-20220323152503077

2、线程的定制化通信

image-20220323162329419

// 提供功能
class Th {
    private int flag = 1;
    
    ReentrantLock lock = new ReentrantLock();
    // 提供三个独立的Condition
    Condition c1 = lock.newCondition();
    Condition c2 = lock.newCondition();
    Condition c3 = lock.newCondition();

    public void print5() {
        lock.lock();
        try {
            while (flag != 1) {
                c1.await();
            }
            for (int i = 1; i <= 5; i++) {
                System.out.println(Thread.currentThread().getName() + "::" + i);
            }
            flag = 2;
            c2.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void print10() {
        lock.lock();
        try {
            while (flag != 2) {
                c2.await();
            }
            for (int i = 1; i <= 10; i++) {
                System.out.println(Thread.currentThread().getName() + "::" + i);
            }
            flag = 3;
            c3.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    public void print15() {
        lock.lock();
        try {
            while (flag != 3) {
                c3.await();
            }
            for (int i = 1; i <= 15; i++) {
                System.out.println(Thread.currentThread().getName() + "::" + i);
            }
            flag = 1;
            c1.signal();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }
}

public class dd {
    public static void main(String[] args) {
        Th th = new Th();
        new Thread(() -> {
            for (int i = 0; i < 5; i++) {
                th.print5();
            }
        }, "C1").start();
        new Thread(() -> {
            for (int i = 0; i < 10; i++) {
                th.print10();
            }
        }, "C2").start();
        new Thread(() -> {
            for (int i = 0; i < 15; i++) {
                th.print15();
            }
        }, "C3").start();
    }
}

3、关于ArrayList()线程不安全问题的解决方法

线程不安全条件下的并发操作

 public static void main(String[] args) {
        ArrayList<String> arrayList = new ArrayList<>();
        for (int i = 0; i < 30; i++) {
            new Thread(()->{
                arrayList.add(UUID.randomUUID().toString().substring(0, 8));
                System.out.println(arrayList);
            },i+"").start();
        }
    }

经测试,不能再 @Test下测试,不会有任何输出。通过上述测试,会报 java.util.ConcurrentModificationException异常

解决方案一:使用 Vector。List的古老实现类,JDK1.0引入

 public static void main(String[] args) {
        List<String> arrayList = new Vector<>();
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                arrayList.add(UUID.randomUUID().toString().substring(0, 8));
                System.out.println(arrayList);
            }, i + "").start();
        }
    }

解决方案二:使用 Collections工具类

public static void main(String[] args) {
        List<String> arrayList = Collections.synchronizedList(new ArrayList<String>());

        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                arrayList.add(UUID.randomUUID().toString().substring(0, 8));
                System.out.println(arrayList);
            }, i + "").start();
        }
    }

解决方案三:CopyOnWriteArrayList写时复制技术

原理图:

image-20220323170909726

 public static void main(String[] args) {
        CopyOnWriteArrayList<String> arrayList = new CopyOnWriteArrayList<>();
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                arrayList.add(UUID.randomUUID().toString().substring(0, 8));
                System.out.println(arrayList);
            }, i + "").start();
        }
    }

4、HashSet()的线程不安全问题

 public static void main(String[] args) {
        HashSet<String> hashSet = new HashSet<>();
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                hashSet.add(UUID.randomUUID().toString().substring(0, 8));
                System.out.println(hashSet);
            }, i + "").start();
        }
    }

报错:java.util.ConcurrentModificationException并发修改异常

解决方案一:使用 Collections工具类

  public static void main(String[] args) {
        Set<String> hashSet = Collections.synchronizedSet(new HashSet<String>());
        for (int i = 0; i < 30; i++) {
            new Thread(() -> {
                hashSet.add(UUID.randomUUID().toString().substring(0, 8));
                System.out.println(hashSet);
            }, i + "").start();
        }
    }

解决方案二:CopyOnWriteArraySet方法

public static void main(String[] args) {
    CopyOnWriteArraySet<String> hashSet = new CopyOnWriteArraySet<>();
    for (int i = 0; i < 30; i++) {
        new Thread(() -> {
            hashSet.add(UUID.randomUUID().toString().substring(0, 8));
            System.out.println(hashSet);
        }, i + "").start();
    }
}

5、关于 HashMap线程不安全的问题

 public static void main(String[] args) {
        HashMap<String, String> hashMap = new HashMap<>();
        for (int i = 0; i < 30; i++) {
            String key = String.valueOf(i);
            new Thread(() -> {
                hashMap.put(key, UUID.randomUUID().toString().substring(0, 8));
                System.out.println(hashMap);
            }, i + "").start();
        }
    }

报错:java.util.ConcurrentModificationException并发修改异常

解决方案: 使用ConcurrentHashMap

    public static void main(String[] args) {
        ConcurrentHashMap<String, String> hashMap = new ConcurrentHashMap<>();
        for (int i = 0; i < 30; i++) {
            String key = String.valueOf(i);
            new Thread(() -> {
                hashMap.put(key, UUID.randomUUID().toString().substring(0, 8));
                System.out.println(hashMap);
            }, i + "").start();
        }
    }

6、可重入锁 synchronizedLock

synchronized

public static void main(String[] args) {
        Object o = new Object();
        new Thread(()->{
            synchronized (o) {
                System.out.println(Thread.currentThread().getName() + " 外层");
                synchronized (o){
                    System.out.println(Thread.currentThread().getName() + " 中层");
                    synchronized (o) {
                        System.out.println(Thread.currentThread().getName() + " 内层");
                    }
                }

            }
        },"线程A").start();
    }

//输出
线程A 外层
线程A 中层
线程A 内层

Lock

正确演示

public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();
        try {
            lock.lock();
            System.out.println("外层");
            try {
                lock.lock();
                System.out.println("内层");
            } finally {
                lock.unlock();
            }
        } finally {
            lock.unlock();
        }
    }

错误演示,假如某一个锁只加锁,不释放锁

 public static void main(String[] args) {
        ReentrantLock lock = new ReentrantLock();

        new Thread(() -> {
        try {
            lock.lock();
            System.out.println(Thread.currentThread().getName()+"外层");
            try {
                lock.lock();
                System.out.println(Thread.currentThread().getName()+"内层");
            } finally {
//                lock.unlock();
            }
        } finally {
            lock.unlock();
        }
       },"线程一").start();

        new Thread(()->{
            try {
                lock.lock();
                System.out.println("第二个线程所输出的内容");
            }finally {
                lock.unlock();
            }
        },"线程二").start();
    }
//输出结果:会发现第二个线程的内容无法输出,而且程序并没停止。原因,第一个没有把锁释放,第线程二阻塞
线程一外层
线程一内层

7、制作一个死锁

public static void main(String[] args) {
        Object resource1 = new Object();
        Object resource2 = new Object();

        new Thread(()->{
            synchronized (resource1){
                System.out.println(Thread.currentThread().getName() + "获取resource1,尝试获取resource2");
                try {
                    Thread.sleep(1000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }

                synchronized (resource2) {
                    System.out.println(Thread.currentThread().getName()+"获取到resource2");
                }
            }
        },"线程一").start();
        new Thread(()->{
            synchronized (resource2){
                System.out.println(Thread.currentThread().getName() + "获取resource2,尝试获取resource1");
                synchronized (resource1){
                    System.out.println(Thread.currentThread().getName()+"获取到resource1");
                }
            }
        },"线程二").start();
    }

8、Callable接口

 // 实现 Callable接口
    static class Call implements Callable<Integer>{
        @Override
        public Integer call() throws Exception {
            return 200;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 使用lambda表达式创建一个futureTask
        FutureTask<Integer> futureTask2 = new FutureTask<>(() -> {
            return 250;
        });
        FutureTask<Integer> futureTask1 = new FutureTask<>(new Call());
        // 开启一个线程一
        new Thread(futureTask1,"线程一").start();
        // 开启一个线程二
        new Thread(futureTask2,"线程二").start();
        // 判断线程是否执行完毕
        while (!futureTask2.isDone()) {
            System.out.println("线程还没结束,wait....");
        }
        // 获取线程二的返回值
        Integer integer = futureTask2.get();
        // 获取线程一的返回值
        Integer integer1 = futureTask1.get();
        System.out.println("线程一"+integer1);
        System.out.println("线程二"+integer);

    }

9、辅助类

**CountDownLatch**一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程等待

public static void main(String[] args) throws InterruptedException {
    	// 声明一组有六个线程
        CountDownLatch countDownLatch = new CountDownLatch(6);
        for (int i = 1; i <= 6; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "人离开教室");
                // 每执行完一个进程,进程数减一
                countDownLatch.countDown();
            }, String.valueOf(i)).start();
        }
    	// 六个进程没执行完之前等待
        countDownLatch.await();
        System.out.println(Thread.currentThread().getName() + "班长锁门");
    }

CyclicBarrier一个同步辅助类,它允许一组线程互相等待,直到达到某一个公公屏障点,在设计一组固定大小的线程的程序中,这些线程必须不时的互相等待,此时 CyclicBarrier很有用,因为该 Barrier在释放等待线程后可以重用,所以称他为循环的 Barrier

 public static void main(String[] args)  {
        final int NUMBER = 7;
        CyclicBarrier barrier = new CyclicBarrier(NUMBER, () -> {
            System.out.println("七颗龙珠,召唤神龙");
        });

        for (int i = 1; i <= 7; i++) {
            new Thread(() -> {
                System.out.println(Thread.currentThread().getName() + "颗龙珠被集齐");
                // 等待
                try {
                    barrier.await();
                } catch (InterruptedException | BrokenBarrierException e) {
                    e.printStackTrace();
                }
            }, String.valueOf(i)).start();
        }

    }

Semaphore一个计数信号量,从概念上讲,信号量维护了一个许可集,如有必要,在许可可用前会阻塞每一个 acquire(),然后在获取该许可,每个 release()添加一个许可,从而可能释放一个正在阻塞的获取者,但是,不使用实际许可对象, Semaphore只对可用许可的号码进行计数,并采取相应的行动。

public static void main(String[] args) {

        // 设置许可数量
        Semaphore semaphore = new Semaphore(3);

        for (int i = 1; i <= 6; i++) {
            new Thread(()->{
                try {
                    // 抢占
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + "抢占了车位");
                    // 设置随机抢占时间0~5
                    TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                    System.out.println(Thread.currentThread().getName() + "----释放了车位");
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }finally {
                    // 释放
                    semaphore.release();
                }
            },String.valueOf(i)).start();
        }
    }

10、读写锁案例

读写锁:一个资源可以被多个读线程访问,或者可以被一个写线程访问,但是不能同时存在读写线程,读写互斥,读读共享。

缺点:

  • 造成锁饥饿,一直读,没有写操作。
  • 读的时候,不能写,只有读完之后才可以写,写操作可以读

未加锁前

class MyCache{
    public volatile HashMap<String, Object> map = new HashMap<>();

    // 写数据
    public void put(String key, Object value) {
        System.out.println(Thread.currentThread().getName() + "正在写数据" + key);

        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        map.put(key, value);
        System.out.println(Thread.currentThread().getName()+"--写完了");
    }

    // 读数据
    public Object get(String key) {
        System.out.println(Thread.currentThread().getName() + "正在读数据" + key);
        try {
            TimeUnit.SECONDS.sleep(1);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        Object o = map.get(key);
        System.out.println(Thread.currentThread().getName()+"读数据完成");
        return o;
    }
}
public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();
        for (int i = 1; i <= 5; i++) {
            int finalI = i;
            new Thread(() -> {
                myCache.put(finalI + "", finalI);
            }, String.valueOf(i)).start();
        }

        for (int i = 1; i <= 5; i++) {
            int finalI = i;
            new Thread(() -> {
                myCache.get(finalI + "");
            }, String.valueOf(i)).start();
        }

    }
}
// 输出结果如下
1正在写数据1
4正在写数据4
2正在写数据2
3正在写数据3
5正在写数据5
2正在读数据2
3正在读数据3
1正在读数据1
4正在读数据4
5正在读数据5
5--写完了
5读数据完成
4读数据完成
3读数据完成
4--写完了
1--写完了
2读数据完成
1读数据完成
2--写完了
3--写完了

从输出结果我们可以看到,有些数据还没有写入完成,就已经被读取了,显然读到的数据是空,为了防止此类事件的发生,加入读写锁如下

import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

class MyCache{
    public volatile HashMap<String, Object> map = new HashMap<>();
    // 创建读写锁对象
    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();

    // 写数据
    public void put(String key, Object value) {

        // 添加写锁
        readWriteLock.writeLock().lock();
        try {
            System.out.println(Thread.currentThread().getName() + "正在写数据" + key);
            TimeUnit.SECONDS.sleep(1);
            map.put(key, value);
            System.out.println(Thread.currentThread().getName() + "--写完了");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 释放写锁
            readWriteLock.writeLock().unlock();
        }
    }

    // 读数据
    public Object get(String key) {
        // 添加读锁
        readWriteLock.readLock().lock();
        Object o = null;
        try {
            System.out.println(Thread.currentThread().getName() + "正在读数据" + key);
            TimeUnit.SECONDS.sleep(1);
            o = map.get(key);
            System.out.println(Thread.currentThread().getName() + "读数据完成");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }finally {
            // 释放读锁
            readWriteLock.readLock().unlock();
        }
        return o;
    }
}
public class ReadWriteLockDemo {
    public static void main(String[] args) {
        MyCache myCache = new MyCache();
        // 创建线程写数据
        for (int i = 1; i <= 5; i++) {
            int finalI = i;
            new Thread(() -> {
                myCache.put(finalI + "", finalI);
            }, String.valueOf(i)).start();
        }
		// 创建线程读数据
        for (int i = 1; i <= 5; i++) {
            int finalI = i;
            new Thread(() -> {
                myCache.get(finalI + "");
            }, String.valueOf(i)).start();
        }

    }
}
// 输出结果如下
2正在写数据2
2--写完了
1正在写数据1
1--写完了
3正在写数据3
3--写完了
4正在写数据4
4--写完了
5正在写数据5
5--写完了
1正在读数据1
2正在读数据2
3正在读数据3
4正在读数据4
5正在读数据5
5读数据完成
1读数据完成
4读数据完成
2读数据完成
3读数据完成

从结果我们不难看出,都是先写入完成后才能读数据,而且,从结果我们也可以看出,对于写锁,是排他锁,同一时刻,只能有一个线程对其进行写操作,而对于读取数据的话,是共享锁,同一时刻,可以有多个线程对其进行读操作。

11、阻塞队列

ArrayBlockingQueue演示

package com.tzf.queue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

public class BlockingQueueDemo {

    public static void main(String[] args) throws InterruptedException {
        ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
       /* System.out.println(blockingQueue.add("a"));// true
        System.out.println(blockingQueue.add("b"));// true
        System.out.println(blockingQueue.add("c"));// true
//        System.out.println(blockingQueue.add("a"));// 报异常:Queue full

        System.out.println(blockingQueue.element());// 返回队列中的第一个元素,但元素并不出队列
        System.out.println(blockingQueue.remove());// 删除队列中的头一个元素
        */
        /*                以上是出错会报异常             */


        // .offer()向队列中插入一个元素
       /*
        System.out.println(blockingQueue.offer("a"));// true
        System.out.println(blockingQueue.offer("b"));// true
        System.out.println(blockingQueue.offer("c"));// true
        // System.out.println(blockingQueue.offer("d"));// false 队列满了的话会返回false
        System.out.println(blockingQueue.poll());// 从队列中删除元素,当队列中没有元素是返回null
       */


        // put()向队列中插入元素,若队列满了,则阻塞,直到队列有空位置,再插入
        blockingQueue.put("a");
        blockingQueue.put("a");
        blockingQueue.put("a");
        blockingQueue.offer("a", 3L, TimeUnit.SECONDS);// 此语句阻塞三秒后自动放弃插入数据
//      blockingQueue.put("a");// 此次添加将会阻塞

        // take() 从队列中取出一个元素。若队列中没有元素可取,则阻塞,直到有元素时再取出元素
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());
        System.out.println(blockingQueue.take());// 此次阻塞,等待元素插入队列
    }
}

12、线程池

根据阿里巴巴开发手册规定,不建议用如下方法创建线程(因为队列长度最大值为 Integer.MAX_VALUE,可能会堆积大量请求,从而导致OOM),使用自定义线程池,

		// 创建固定大小的线程池
        ExecutorService executorService = Executors.newFixedThreadPool(8);

        // 创建单个线程
        ExecutorService executorService1 = Executors.newSingleThreadExecutor();
        
        // 线程池大小不确定,根据实际情况改变大小 
        ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
        

线程池底层ThreadPoolExecutor的七个参数介绍

ThreadPoolExecutor(
   							int corePoolSize, // 核心线程数
                              int maximumPoolSize,// 最大线程数
                              long keepAliveTime, // 线程存活时间
                              TimeUnit unit,      // 存活时间的单位
                              BlockingQueue<Runnable> workQueue, // 阻塞队列的大小
                              ThreadFactory threadFactory,       // 线程工厂,用于创建线程
                              RejectedExecutionHandler handler)   // 拒绝策略

有四种拒绝策略:

  • AbortPolicy (默认):直接抛出 RejectedExecutionException异常阻止系统正常运行
  • CallerRunsPolicy:”调用者运行“一种调节机制,该策略不会抛弃任务,也不会抛出异常,而是将某些任务返回给调用者,从而降低新任务流量
  • DiscardOldestPolicy:抛弃队列中等待最久的任务,然后把当前任务加入到队列中,尝试再次提交事务
  • DiscardPolicy:该策略默默丢弃无法处理的任务,不予任何处理也不抛出异常,如果允许任务丢失,这是最好的一种策略

自定义线程池

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
                3,// 核心线程数
                5,// 最大线程数
                3L,// 线程存活时间
                TimeUnit.SECONDS,// 存活时间的单位
                new ArrayBlockingQueue<>(3), // 阻塞队列的大小
                Executors.defaultThreadFactory(),// 线程工厂,y于创建线程
                new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略
    }

13、Fork/Join 框架

Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成 若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进 行 join 汇总。

image-20220324222808339

Fork/Join 框架与线程池的区别

采用 “工作窃取”模式(work-stealing): 当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。

相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上,在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中, 如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理 该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了 线程的等待时间,提高了性能

package com.tzf.forkjoin;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;

class MyTask extends RecursiveTask<Integer> {
    // 拆分插值不能大于10,
    private static final Integer VALUE = 10;

    private int begin;
    private int end;
    private int result;

    public MyTask(int begin, int end) {
        this.begin = begin;
        this.end = end;
    }

    @Override
    protected Integer compute() {
        if ((end - begin) <= VALUE) {
            for (int i = begin; i <= end; i++) {
                result += i;
            }
        } else {
            int middle = begin + (end - begin) / 2;
            // 拆分左边
            MyTask myTask = new MyTask(begin, middle);
            // 拆分右边
            MyTask myTask1 = new MyTask(middle+1, end);

            // 调用方法拆分
            myTask.fork();
            myTask1.fork();

            // 合并结果
            result = myTask.join() + myTask1.join();
        }
        return result;
    }
}
public class ForkJoinDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        MyTask myTask = new MyTask(0, 100);
        // 创建分支合并对象
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        ForkJoinTask<Integer> task = forkJoinPool.submit(myTask);
        // 获取最终合并的结果
        Integer integer = task.get();
        System.out.println(integer);
        // 关闭池对象
        forkJoinPool.shutdown();
    }
}

举报

相关推荐

JUC简单学习笔记

JUC多线程学习笔记

JUC笔记

JUC学习

狂神说JUC并发编程学习笔记

0 条评论