1、关于 wait()
方法虚假唤醒的问题
wait()
方法一个特点就是在哪里沉睡就会在哪里醒来,所以,如果wait()
方法没加在循环里,就会出现一个只有第一次判断生效,第二次则不进行判断,直接往下执行造成虚假唤醒的情况,从而导致数据出错。下图中,把 if
改为 while
解决虚假唤醒问题
2、线程的定制化通信
// 提供功能
class Th {
private int flag = 1;
ReentrantLock lock = new ReentrantLock();
// 提供三个独立的Condition
Condition c1 = lock.newCondition();
Condition c2 = lock.newCondition();
Condition c3 = lock.newCondition();
public void print5() {
lock.lock();
try {
while (flag != 1) {
c1.await();
}
for (int i = 1; i <= 5; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i);
}
flag = 2;
c2.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print10() {
lock.lock();
try {
while (flag != 2) {
c2.await();
}
for (int i = 1; i <= 10; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i);
}
flag = 3;
c3.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void print15() {
lock.lock();
try {
while (flag != 3) {
c3.await();
}
for (int i = 1; i <= 15; i++) {
System.out.println(Thread.currentThread().getName() + "::" + i);
}
flag = 1;
c1.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}
public class dd {
public static void main(String[] args) {
Th th = new Th();
new Thread(() -> {
for (int i = 0; i < 5; i++) {
th.print5();
}
}, "C1").start();
new Thread(() -> {
for (int i = 0; i < 10; i++) {
th.print10();
}
}, "C2").start();
new Thread(() -> {
for (int i = 0; i < 15; i++) {
th.print15();
}
}, "C3").start();
}
}
3、关于ArrayList()
线程不安全问题的解决方法
线程不安全条件下的并发操作
public static void main(String[] args) {
ArrayList<String> arrayList = new ArrayList<>();
for (int i = 0; i < 30; i++) {
new Thread(()->{
arrayList.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(arrayList);
},i+"").start();
}
}
经测试,不能再 @Test
下测试,不会有任何输出。通过上述测试,会报 java.util.ConcurrentModificationException
异常
解决方案一:使用 Vector
。List的古老实现类,JDK1.0引入
public static void main(String[] args) {
List<String> arrayList = new Vector<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
arrayList.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(arrayList);
}, i + "").start();
}
}
解决方案二:使用 Collections
工具类
public static void main(String[] args) {
List<String> arrayList = Collections.synchronizedList(new ArrayList<String>());
for (int i = 0; i < 30; i++) {
new Thread(() -> {
arrayList.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(arrayList);
}, i + "").start();
}
}
解决方案三:CopyOnWriteArrayList
写时复制技术
原理图:
public static void main(String[] args) {
CopyOnWriteArrayList<String> arrayList = new CopyOnWriteArrayList<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
arrayList.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(arrayList);
}, i + "").start();
}
}
4、HashSet()
的线程不安全问题
public static void main(String[] args) {
HashSet<String> hashSet = new HashSet<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
hashSet.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(hashSet);
}, i + "").start();
}
}
报错:java.util.ConcurrentModificationException
并发修改异常
解决方案一:使用 Collections
工具类
public static void main(String[] args) {
Set<String> hashSet = Collections.synchronizedSet(new HashSet<String>());
for (int i = 0; i < 30; i++) {
new Thread(() -> {
hashSet.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(hashSet);
}, i + "").start();
}
}
解决方案二:CopyOnWriteArraySet
方法
public static void main(String[] args) {
CopyOnWriteArraySet<String> hashSet = new CopyOnWriteArraySet<>();
for (int i = 0; i < 30; i++) {
new Thread(() -> {
hashSet.add(UUID.randomUUID().toString().substring(0, 8));
System.out.println(hashSet);
}, i + "").start();
}
}
5、关于 HashMap
线程不安全的问题
public static void main(String[] args) {
HashMap<String, String> hashMap = new HashMap<>();
for (int i = 0; i < 30; i++) {
String key = String.valueOf(i);
new Thread(() -> {
hashMap.put(key, UUID.randomUUID().toString().substring(0, 8));
System.out.println(hashMap);
}, i + "").start();
}
}
报错:java.util.ConcurrentModificationException
并发修改异常
解决方案: 使用ConcurrentHashMap
public static void main(String[] args) {
ConcurrentHashMap<String, String> hashMap = new ConcurrentHashMap<>();
for (int i = 0; i < 30; i++) {
String key = String.valueOf(i);
new Thread(() -> {
hashMap.put(key, UUID.randomUUID().toString().substring(0, 8));
System.out.println(hashMap);
}, i + "").start();
}
}
6、可重入锁 synchronized
和 Lock
synchronized
public static void main(String[] args) {
Object o = new Object();
new Thread(()->{
synchronized (o) {
System.out.println(Thread.currentThread().getName() + " 外层");
synchronized (o){
System.out.println(Thread.currentThread().getName() + " 中层");
synchronized (o) {
System.out.println(Thread.currentThread().getName() + " 内层");
}
}
}
},"线程A").start();
}
//输出
线程A 外层
线程A 中层
线程A 内层
Lock
正确演示
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
try {
lock.lock();
System.out.println("外层");
try {
lock.lock();
System.out.println("内层");
} finally {
lock.unlock();
}
} finally {
lock.unlock();
}
}
错误演示,假如某一个锁只加锁,不释放锁
public static void main(String[] args) {
ReentrantLock lock = new ReentrantLock();
new Thread(() -> {
try {
lock.lock();
System.out.println(Thread.currentThread().getName()+"外层");
try {
lock.lock();
System.out.println(Thread.currentThread().getName()+"内层");
} finally {
// lock.unlock();
}
} finally {
lock.unlock();
}
},"线程一").start();
new Thread(()->{
try {
lock.lock();
System.out.println("第二个线程所输出的内容");
}finally {
lock.unlock();
}
},"线程二").start();
}
//输出结果:会发现第二个线程的内容无法输出,而且程序并没停止。原因,第一个没有把锁释放,第线程二阻塞
线程一外层
线程一内层
7、制作一个死锁
public static void main(String[] args) {
Object resource1 = new Object();
Object resource2 = new Object();
new Thread(()->{
synchronized (resource1){
System.out.println(Thread.currentThread().getName() + "获取resource1,尝试获取resource2");
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
synchronized (resource2) {
System.out.println(Thread.currentThread().getName()+"获取到resource2");
}
}
},"线程一").start();
new Thread(()->{
synchronized (resource2){
System.out.println(Thread.currentThread().getName() + "获取resource2,尝试获取resource1");
synchronized (resource1){
System.out.println(Thread.currentThread().getName()+"获取到resource1");
}
}
},"线程二").start();
}
8、Callable接口
// 实现 Callable接口
static class Call implements Callable<Integer>{
@Override
public Integer call() throws Exception {
return 200;
}
}
public static void main(String[] args) throws ExecutionException, InterruptedException {
// 使用lambda表达式创建一个futureTask
FutureTask<Integer> futureTask2 = new FutureTask<>(() -> {
return 250;
});
FutureTask<Integer> futureTask1 = new FutureTask<>(new Call());
// 开启一个线程一
new Thread(futureTask1,"线程一").start();
// 开启一个线程二
new Thread(futureTask2,"线程二").start();
// 判断线程是否执行完毕
while (!futureTask2.isDone()) {
System.out.println("线程还没结束,wait....");
}
// 获取线程二的返回值
Integer integer = futureTask2.get();
// 获取线程一的返回值
Integer integer1 = futureTask1.get();
System.out.println("线程一"+integer1);
System.out.println("线程二"+integer);
}
9、辅助类
**CountDownLatch
**一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程等待
public static void main(String[] args) throws InterruptedException {
// 声明一组有六个线程
CountDownLatch countDownLatch = new CountDownLatch(6);
for (int i = 1; i <= 6; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "人离开教室");
// 每执行完一个进程,进程数减一
countDownLatch.countDown();
}, String.valueOf(i)).start();
}
// 六个进程没执行完之前等待
countDownLatch.await();
System.out.println(Thread.currentThread().getName() + "班长锁门");
}
CyclicBarrier
一个同步辅助类,它允许一组线程互相等待,直到达到某一个公公屏障点,在设计一组固定大小的线程的程序中,这些线程必须不时的互相等待,此时 CyclicBarrier
很有用,因为该 Barrier
在释放等待线程后可以重用,所以称他为循环的 Barrier
public static void main(String[] args) {
final int NUMBER = 7;
CyclicBarrier barrier = new CyclicBarrier(NUMBER, () -> {
System.out.println("七颗龙珠,召唤神龙");
});
for (int i = 1; i <= 7; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + "颗龙珠被集齐");
// 等待
try {
barrier.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
}, String.valueOf(i)).start();
}
}
Semaphore
一个计数信号量,从概念上讲,信号量维护了一个许可集,如有必要,在许可可用前会阻塞每一个 acquire()
,然后在获取该许可,每个 release()
添加一个许可,从而可能释放一个正在阻塞的获取者,但是,不使用实际许可对象, Semaphore
只对可用许可的号码进行计数,并采取相应的行动。
public static void main(String[] args) {
// 设置许可数量
Semaphore semaphore = new Semaphore(3);
for (int i = 1; i <= 6; i++) {
new Thread(()->{
try {
// 抢占
semaphore.acquire();
System.out.println(Thread.currentThread().getName() + "抢占了车位");
// 设置随机抢占时间0~5
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName() + "----释放了车位");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 释放
semaphore.release();
}
},String.valueOf(i)).start();
}
}
10、读写锁案例
读写锁:一个资源可以被多个读线程访问,或者可以被一个写线程访问,但是不能同时存在读写线程,读写互斥,读读共享。
缺点:
- 造成锁饥饿,一直读,没有写操作。
- 读的时候,不能写,只有读完之后才可以写,写操作可以读
未加锁前
class MyCache{
public volatile HashMap<String, Object> map = new HashMap<>();
// 写数据
public void put(String key, Object value) {
System.out.println(Thread.currentThread().getName() + "正在写数据" + key);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
map.put(key, value);
System.out.println(Thread.currentThread().getName()+"--写完了");
}
// 读数据
public Object get(String key) {
System.out.println(Thread.currentThread().getName() + "正在读数据" + key);
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
Object o = map.get(key);
System.out.println(Thread.currentThread().getName()+"读数据完成");
return o;
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
for (int i = 1; i <= 5; i++) {
int finalI = i;
new Thread(() -> {
myCache.put(finalI + "", finalI);
}, String.valueOf(i)).start();
}
for (int i = 1; i <= 5; i++) {
int finalI = i;
new Thread(() -> {
myCache.get(finalI + "");
}, String.valueOf(i)).start();
}
}
}
// 输出结果如下
1正在写数据1
4正在写数据4
2正在写数据2
3正在写数据3
5正在写数据5
2正在读数据2
3正在读数据3
1正在读数据1
4正在读数据4
5正在读数据5
5--写完了
5读数据完成
4读数据完成
3读数据完成
4--写完了
1--写完了
2读数据完成
1读数据完成
2--写完了
3--写完了
从输出结果我们可以看到,有些数据还没有写入完成,就已经被读取了,显然读到的数据是空,为了防止此类事件的发生,加入读写锁如下
import java.util.HashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
class MyCache{
public volatile HashMap<String, Object> map = new HashMap<>();
// 创建读写锁对象
private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
// 写数据
public void put(String key, Object value) {
// 添加写锁
readWriteLock.writeLock().lock();
try {
System.out.println(Thread.currentThread().getName() + "正在写数据" + key);
TimeUnit.SECONDS.sleep(1);
map.put(key, value);
System.out.println(Thread.currentThread().getName() + "--写完了");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 释放写锁
readWriteLock.writeLock().unlock();
}
}
// 读数据
public Object get(String key) {
// 添加读锁
readWriteLock.readLock().lock();
Object o = null;
try {
System.out.println(Thread.currentThread().getName() + "正在读数据" + key);
TimeUnit.SECONDS.sleep(1);
o = map.get(key);
System.out.println(Thread.currentThread().getName() + "读数据完成");
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
// 释放读锁
readWriteLock.readLock().unlock();
}
return o;
}
}
public class ReadWriteLockDemo {
public static void main(String[] args) {
MyCache myCache = new MyCache();
// 创建线程写数据
for (int i = 1; i <= 5; i++) {
int finalI = i;
new Thread(() -> {
myCache.put(finalI + "", finalI);
}, String.valueOf(i)).start();
}
// 创建线程读数据
for (int i = 1; i <= 5; i++) {
int finalI = i;
new Thread(() -> {
myCache.get(finalI + "");
}, String.valueOf(i)).start();
}
}
}
// 输出结果如下
2正在写数据2
2--写完了
1正在写数据1
1--写完了
3正在写数据3
3--写完了
4正在写数据4
4--写完了
5正在写数据5
5--写完了
1正在读数据1
2正在读数据2
3正在读数据3
4正在读数据4
5正在读数据5
5读数据完成
1读数据完成
4读数据完成
2读数据完成
3读数据完成
从结果我们不难看出,都是先写入完成后才能读数据,而且,从结果我们也可以看出,对于写锁,是排他锁,同一时刻,只能有一个线程对其进行写操作,而对于读取数据的话,是共享锁,同一时刻,可以有多个线程对其进行读操作。
11、阻塞队列
ArrayBlockingQueue
演示
package com.tzf.queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;
public class BlockingQueueDemo {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<>(3);
/* System.out.println(blockingQueue.add("a"));// true
System.out.println(blockingQueue.add("b"));// true
System.out.println(blockingQueue.add("c"));// true
// System.out.println(blockingQueue.add("a"));// 报异常:Queue full
System.out.println(blockingQueue.element());// 返回队列中的第一个元素,但元素并不出队列
System.out.println(blockingQueue.remove());// 删除队列中的头一个元素
*/
/* 以上是出错会报异常 */
// .offer()向队列中插入一个元素
/*
System.out.println(blockingQueue.offer("a"));// true
System.out.println(blockingQueue.offer("b"));// true
System.out.println(blockingQueue.offer("c"));// true
// System.out.println(blockingQueue.offer("d"));// false 队列满了的话会返回false
System.out.println(blockingQueue.poll());// 从队列中删除元素,当队列中没有元素是返回null
*/
// put()向队列中插入元素,若队列满了,则阻塞,直到队列有空位置,再插入
blockingQueue.put("a");
blockingQueue.put("a");
blockingQueue.put("a");
blockingQueue.offer("a", 3L, TimeUnit.SECONDS);// 此语句阻塞三秒后自动放弃插入数据
// blockingQueue.put("a");// 此次添加将会阻塞
// take() 从队列中取出一个元素。若队列中没有元素可取,则阻塞,直到有元素时再取出元素
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());
System.out.println(blockingQueue.take());// 此次阻塞,等待元素插入队列
}
}
12、线程池
根据阿里巴巴开发手册规定,不建议用如下方法创建线程(因为队列长度最大值为 Integer.MAX_VALUE
,可能会堆积大量请求,从而导致OOM
),使用自定义线程池,
// 创建固定大小的线程池
ExecutorService executorService = Executors.newFixedThreadPool(8);
// 创建单个线程
ExecutorService executorService1 = Executors.newSingleThreadExecutor();
// 线程池大小不确定,根据实际情况改变大小
ExecutorService cachedThreadPool = Executors.newCachedThreadPool();
线程池底层ThreadPoolExecutor
的七个参数介绍
ThreadPoolExecutor(
int corePoolSize, // 核心线程数
int maximumPoolSize,// 最大线程数
long keepAliveTime, // 线程存活时间
TimeUnit unit, // 存活时间的单位
BlockingQueue<Runnable> workQueue, // 阻塞队列的大小
ThreadFactory threadFactory, // 线程工厂,用于创建线程
RejectedExecutionHandler handler) // 拒绝策略
有四种拒绝策略:
AbortPolicy
(默认):直接抛出RejectedExecutionException
异常阻止系统正常运行CallerRunsPolicy
:”调用者运行“一种调节机制,该策略不会抛弃任务,也不会抛出异常,而是将某些任务返回给调用者,从而降低新任务流量DiscardOldestPolicy
:抛弃队列中等待最久的任务,然后把当前任务加入到队列中,尝试再次提交事务DiscardPolicy
:该策略默默丢弃无法处理的任务,不予任何处理也不抛出异常,如果允许任务丢失,这是最好的一种策略
自定义线程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
3,// 核心线程数
5,// 最大线程数
3L,// 线程存活时间
TimeUnit.SECONDS,// 存活时间的单位
new ArrayBlockingQueue<>(3), // 阻塞队列的大小
Executors.defaultThreadFactory(),// 线程工厂,y于创建线程
new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略
}
13、Fork/Join 框架
Fork/Join 框架:就是在必要的情况下,将一个大任务,进行拆分(fork)成 若干个小任务(拆到不可再拆时),再将一个个的小任务运算的结果进 行 join 汇总。
Fork/Join 框架与线程池的区别
采用 “工作窃取”模式(work-stealing): 当执行新的任务时它可以将其拆分分成更小的任务执行,并将小任务加到线程队列中,然后再从一个随机线程的队列中偷一个并把它放在自己的队列中。
相对于一般的线程池实现,fork/join框架的优势体现在对其中包含的任务的处理方式上,在一般的线程池中,如果一个线程正在执行的任务由于某些原因无法继续运行,那么该线程会处于等待状态。而在fork/join框架实现中, 如果某个子问题由于等待另外一个子问题的完成而无法继续运行。那么处理 该子问题的线程会主动寻找其他尚未运行的子问题来执行.这种方式减少了 线程的等待时间,提高了性能
package com.tzf.forkjoin;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveTask;
class MyTask extends RecursiveTask<Integer> {
// 拆分插值不能大于10,
private static final Integer VALUE = 10;
private int begin;
private int end;
private int result;
public MyTask(int begin, int end) {
this.begin = begin;
this.end = end;
}
@Override
protected Integer compute() {
if ((end - begin) <= VALUE) {
for (int i = begin; i <= end; i++) {
result += i;
}
} else {
int middle = begin + (end - begin) / 2;
// 拆分左边
MyTask myTask = new MyTask(begin, middle);
// 拆分右边
MyTask myTask1 = new MyTask(middle+1, end);
// 调用方法拆分
myTask.fork();
myTask1.fork();
// 合并结果
result = myTask.join() + myTask1.join();
}
return result;
}
}
public class ForkJoinDemo {
public static void main(String[] args) throws ExecutionException, InterruptedException {
MyTask myTask = new MyTask(0, 100);
// 创建分支合并对象
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<Integer> task = forkJoinPool.submit(myTask);
// 获取最终合并的结果
Integer integer = task.get();
System.out.println(integer);
// 关闭池对象
forkJoinPool.shutdown();
}
}