目录
一、基础知识
原子性:一个操作或者多个操作 要么全部执行并且执行过程中不会被任何因素打断 要么全部不执行。
可见性:多个线程同时访问一个变量的时候,其中一个线程修改了变量的值,那么其他线程立即能够看到修改后的值
有序性:程序执行的顺序按照代码的先后顺序执行。
线程之间如何通信?
命令式编程中线程的通信方式有 共享内存 、消息传递
在共享内存模型里线程之间共享内存的公共状态、在消息传递模型里线程之间通过消息发送接收的形式来进行通信。Java使用共享内存模型进行线程通信。
Java内存模型
Java内存模型(Java Memory Model,JMM)用于屏蔽掉各种计算机硬件和操作系统的内存访问差异,以实现让Java程序能够在不同的平台都能达到一致的并发效果。JMM规定了JAVA虚拟机和计算机内存是如何协同工作的:规定了一个线程如何和何时可以看到由其他线程修改过后的共享变量的值,以及在必须时如何同步的访问共享变量。
- CPU寄存器:每个CPU都包含一系列的寄存器,它们是CPU内内存的基础,CUP在寄存器上的执行速度远远大于在主存上执行速度。因为CPU访问寄存器的速度远大于主存。
- 高速缓存cache:由于计算机的存储设备与处理器的运算速度之间有几个数量级的差距,为了弥补之间的差距,不得不加入一层读写速度尽可能接近处理的运算速度的高速缓存(Cache)来作为内存与处理器之间的缓冲:讲运算需要使用到的数据复制到缓存中,让运算能快速进行。当运算结束后再从缓存同步到内存中,这样处理器就无需等待缓慢的内存读写了。CPU访问缓存层速度快于访问内存,但比访问寄存器要慢。每个CUP可能有一个或者多层缓存,某一时刻,一个或者多个缓存行(cache lines)可能被读到缓存,一个或多个可能再次被刷回主存。
- 内存:一个计算机还包含一个主存,所有的CPU都可以访问主存。主存通常比CPU的缓存大得多。
运行原理:当一个CPU需要读取主存时,它会将主存的部分读到CPU缓存中,它甚至可能将缓存中的部分内容读取到它的寄存器里,然后在寄存器中执行操作。当CPU需要将结果写回主存中去时,它会将内部寄存器的值刷新到缓存中去,然后再将值刷回主存。
多线程环境下一些问题
- 缓存一致性:在多处理器系统中,每个处理器都有自己的高速缓存,而它们又共享同一主存。基于高速缓存的存储交互很好地解决了处理器与内存的速度矛盾,但是也引入新的问题。缓存一致性。当多个处理器的运算任务都涉及到同一块主内存区域时,将可能导致各自的缓存数据不一致的情况。那么此时该以哪个缓存数据为准就是一个问题。为了解决一致性问题,需要各个处理器访问缓存时都遵循一些协议,在读写的时候根据协议来进行操作。
- 指令重排序:为了使得处理器内部的运算单元能尽量被充分利用,处理器可能会对输入代码进行乱序执行(Out-Of-Order Execution)优化,处理器会在计算之后将乱序执行的结果重组,保证该结果与顺序执行的结果是一致的,但并不保证程序中各个语句计算的先后顺序与输入代码中的顺序一致。因此,如果存在一个计算任务依赖另一个计算任务的中间结果,那么其顺序性并不能靠代码的先后顺序来保证。与处理器的乱序执行优化类似,Java 虚拟机的即时编译器中也有类似的指令重排序(Instruction Reorder)优化
Java内存模型(JMM)
Java 堆和方法区是多个线程共享的数据区域。多个线程可以操作堆和方法区中的同一个数据。局部变量,方法定义参数和异常处理参数不会在线程之间共享,它们不会有内存可见性问题,也不受内存模型影响。
Java 内存模型的英文名称为 Java Memory Model(JMM),其并不像 JVM 内存结构一样真实存在,而是一个抽象的概念。
从抽象的角度来看,JMM 定义了线程和主内存之间的抽象关系:
- 线程之间的共享变量存储在主内存(Main Memory)中
- 每个线程都有一个私有的本地内存(Local Memory),本地内存是 JMM 的一个抽象概念,并不真实存在,它涵盖了缓存、写缓冲区、寄存器以及其他的硬件和编译器优化。本地内存中存储了该线程以读/写共享变量的拷贝副本。
- 从更低的层次来说,主内存就是硬件的内存,而为了获取更好的运行速度,虚拟机及硬件系统可能会让工作内存优先存储于寄存器和高速缓存中。
- Java 内存模型中的线程的工作内存(working memory)是 cpu 的寄存器和高速缓存的抽象描述。而 JVM 的静态内存储模型(JVM 内存模型)只是一种对内存的物理划分而已,它只局限在内存,而且只局限在 JVM 的内存。
JMM规定主内存与工作内存交互协议
关于主内存与工作内存之间的具体交互协议,即一个变量如何从主内存拷贝到工作内存、如何从工作内存同步到主内存之间的实现细节。JMM定义了八种操作来完成:
- 1、lock 作用于主内存的变量,把一个变量标识为一条线程独占状态
- 2、unlock 作用于主内存的变量,解锁上一条
- 3、read 作用于主内存变量,把一个变量值从主内存传到工作内存中,以便后面load
- 4、load 作用于工作内存的变量,把read操作从主内存得到的变量值放入工作内存的变量副本中
- 5、use 作用于工作内存的变量,把工作内存中的一个变量值传递给执行引擎,每当虚拟机遇到一
- 个需要使用变量的值的字节码指令时将会执行这个操作。
- 6、assign 作用于工作内存的变量,把从执行引擎中收到的值赋值给工作内存中的变量。
- 7、store 作用于工作内存的变量,把工作内存中的一个变量的值传到主内存中,以便write
- 8、write 作用于主内存的变量,把store操作传来的变量值传送到主内存的变量中。
以上操作还必须满足一下规则:
- 1、如果要把一个变量从主内存中复制到工作内存,就需要按顺寻地执行 read 和 load 操作, 如果把变量从工作内存中同步回主内存中,就要按顺序地执行 store 和 write 操作。但 JMM 只要求上述操作必须按顺序执行,而没有保证必须是连续执行。
- 2、不允许 read 和 load、store 和 write 操作之一单独出现
- 3、不允许一个线程丢弃它的最近 assign 的操作,即变量在工作内存中改变了之后必须同步到主内存中。
- 4、不允许一个线程无原因地(没有发生过任何 assign 操作)把数据从工作内存同步回主内存中。
- 5、一个新的变量只能在主内存中诞生,不允许在工作内存中直接使用一个未被初始化(load 或 assign)的变量。即就是对一个变量实施 use 和 store 操作之前,必须先执行过了 assign 和 load 操作。
- 6、一个变量在同一时刻只允许一条线程对其进行 lock 操作,但 lock 操作可以被同一条线程重复执行多次,多次执行 lock 后,只有执行相同次数的 unlock 操作,变量才会被解锁。lock 和 unlock 必须成对出现
- 7、如果对一个变量执行 lock 操作,将会清空工作内存中此变量的值,在执行引擎使用这个变量前需要重新执行 load 或 assign 操作初始化变量的值
- 8、如果一个变量事先没有被 lock 操作锁定,则不允许对它执行 unlock 操作;也不允许去 unlock 一个被其他线程锁定的变量。
- 9、对一个变量执行 unlock 操作之前,必须先把此变量同步到主内存中(执行 store 和 write 操作)。
重排序:
为什么要重排序?
为了提高性能,编译器与处理器通常会对指令做重排序,通常为3种
- 1、编译器优化的重排序,不改变单线程语义的情况下重新安排语句执行顺序。
- 2、指令级并行的重排序,现在处理器采用指令并行技术,可将多条指令并行执行。如果不存在数据依赖性,可以改变语句对应指令顺序。
- 3、内存系统的重排序,由于处理器使用缓存和读写缓存区,使得加载和存储操作看上去是乱序执行。
源代码如何变成执行指令?
步骤:源代码->1 编译器优化重排序->2 指令级并行重排序->3 内存系统重排序-> 最终执行指令
对于步骤 1 是编译器重排序,步骤 2、3 是处理器重排序。
对于编译器重排序,JMM 的编译器重排序规则会禁止特定类型的重排序
对于处理器重排序,JMM 的处理器重排序规则会要求 Java 编译器生成指令序列时插入特定类型的内存屏障指令来禁止特定类型的重排序。
内存屏障
load(装载):作用于工作内存的变量,它把 read 操作从主内存中得到的变量值放入工作内存的变量副本中。
store(存储):作用于工作内存的变量,把工作内存中的一个变量的值传送到主内存中,以便随后的 write 的操作。
内存屏障的四种类型如下:
屏障类型 | 指令示例 | 说明 |
LoadLoad 屏障 | Store1;StoreStore;Store2 | 确保 Load1 数据装载先于 Load2 及所有后续装载指令的装载 |
StoreStore 屏障 | Store1;StoreStore;Store2 | 确保 Store1 数据对其他处理器可见(刷新到内存)先于 Store2 及所有后续存储指令的存储 |
LoadStore 屏障 | Load1;LoadStore;Store2 | 确保 Load1 数据装载先于 Store2 及所有后续存储指令刷新到内存 |
StoreLoad 屏障 | Store1;StoreLoad;Load2 | 确保 Store1 数据对其他处理器可见(刷新到内存)先于 Load2 及所有后续装载指令的装载。该屏障会使之前所有的内存访问指令(存储和装载)完成之后,才执行该屏障之后的内存访问指令 |
happens-before 规则:
- 程序顺序规则:在一个线程内,按照程序代码的顺序,前面的代码运行的结果能被后面的代码可见
- 监视器锁规则:一个锁的解锁 happens-before 于后续对这个锁的加锁
- volatile 变量规则:对一个 volatile 域的写,happens-before 于任意后续对这个 volatile 域的读
- 传递性规则:如果 A happens-before B,且 B happens-before C,那么 A happens-before C
- start() 规则:指的是主线程 A 启动子线程 B 后,子线程 B 能看到主线程在启动线程 B 前的任何操作
- join() 规则:主线程 A 等待子线程 B 完成 (对 B 线程 join() 调用),当子线程 B 操作完成后,主线程 A 能看到 B 线程的操作
- interrupt() 规则:线程 A 调用线程 B 的 interrupt() 方法,happens-before 于线程 B 检测中断事件 (也就是 Thread.interrupted() 方法)
- finalize() 规则:对象的构造函数执行、结束 happens-before 于 finalize() 方法的开始
问题总结:
为什么要有 JMM?
为了在不同处理器下正确并发执行,JMM 提出一系列规范约束各个处理器的指令执行达到正确并发效果,主要为多线程下原子性、可见性、有序性问题
为什么重排序?
为了提高执行效率,把程序执行指令重排序。重排序在多线程情况下可能导致程序执行错误
重排序可能导致程序执行错误怎么解决?
JMM 提出 happens-before 语义规则,约束重排序规则
如何约束重排序规则?
插入内存屏障指令告诉处理器如何正确的重排序
顺序一致性
顺序一致性内存模型
顺序一致性内存模型是一个被计算机科学家理想化了的理论参考模型,它为程序员提供了极强的内存可见性保证。JMM 在规范里也保证了顺序一致性。顺序一致性内存模型有两大特性:
① 一个线程中的所有操作必须按照程序的顺序来执行
② 所有线程都只能看到一个单一的操作执行顺序
=举例说明=:
假设有两个线程 A 和 B 并发执行(线程 A 执行后线程 B 执行)。其中
- A 线程有三个操作,它们在程序中的顺序是:A1->A2->A3。
- B 线程有三个操作,它们在程序中的顺序是:B1->B2->B3。
针对第 ① 特征说明:
线程 A 程序的执行顺序永远是 A1->A2->A3
线程 B 程序的执行顺序永远是 B1->B2->B3
针对第 ② 特征说明:
如果正确同步的话,线程 A 执行后释放监视器给线程 B 执行顺序将是 A1->A2->A3->B1->B2->B3。
如果未正确同步的话,可能(CPU 抢占问题)出现的顺序是 A1->A2->B1->A3->B2->B3。
CAS实现原理
CAS( Compare And Swap )
Java 的 compareAndSet(jdk13)/compareAndSwap(jdk1.8) 相关方法调用简称为 CAS。
CAS 机制当中使用了 3 个基本操作数:内存地址 V,旧的预期值 A,要修改的新值 B。更新一个变量的时候,只有当变量的预期值 A 和内存地址 V 当中的实际值相同时,才会将内存地址 V 对应的值修改为 B。
CAS 操作的是乐观锁,每次不加锁而是假设没有冲突而去完成某项操作,如果因为冲突失败就重试,直到成功为止。
@HotSpotIntrinsicCandidate
public final native boolean compareAndSwapInt(Object o, long offset,
int expected,
int x);
- o 是操作的对象
- offset 是 o 对象中某字段在内存中的偏移量(比如对象 AtomicInteger 中有一个 volatile int value 的字段)
- 读取传入对象 o 在内存中偏移量为 offset 位置的值与期望值 expected 作比较。
- 相等就把 x 值赋值给 offset 位置的值。方法返回 true。
- 不相等,就取消赋值,方法返回 false。
CAS 实现原子操作的三大问题
问题一:ABA 问题
因为 CAS 需要在操作值的时候,检查值有没有发生变化,如果没有发生变化则更新,但是如果一个值原来是 A,变成了 B,又变成了 A,那么使用 CAS 进行检查时会发现它的值没有发生变化,但是实际上却变化了。
解决方案:使用版本号。在变量前面 追加上版本号,每次变量更新的时候把版本号加 1,那么 A→B→A 就会变成 1A→2B→3A。从 Java 1.5 开始,JDK 的 Atomic 包里提供了一个类 AtomicStampedReference 来解决 ABA 问题。
这个类的 compareAndSet 方法的作用是首先检查当前引用是否等于预期引用,并且检查当前标志是否等于预期标志,如果全部相等,则以原子方式将该引用和该标志的值设置为给定的更新值。
问题二:循环时间长开销大
自旋 CAS 如果长时间不成功,会给 CPU 带来非常大的执行开销。
如果 JVM 能支持处理器提供的 pause 指令,那么效率会有一定的提升。
pause 指令有两个作用:
它可以延迟流水线执行指令 (de-pipeline),使 CPU 不会消耗过多的执行资源,延迟的时间取决于具体实现的版本,在一些处理器上延迟时间是零;
它可以避免在退出循环的时候因内存顺序冲突 (Memory Order Violation) 而引起 CPU 流水线被清空 (CPU Pipeline Flush),从而提高 CPU 的执行效率。
问题三:只能保证一个共享变量的原子操作
当对一个共享变量执行操作时,我们可以使用循环 CAS 的方式来保证原子操作,但是对多个共享变量操作时,循环 CAS 就无法保证操作的原子性,这个时候就可以用锁。
还有一个取巧的办法,就是把多个共享变量合并成一个共享变量来操作。比如,有两个共享变量 i=2,j=a,合并一下 ij=2a,然后用 CAS 来操作 ij。从 Java 1.5 开始,JDK 提供了 AtomicReference 类来保证引用对象之间的原子性,就可以把多个变量放在一个对象里来进行 CAS 操作。
原子操作
什么是原子操作?
原子本意是“不能被进一步分割的最小粒子”
原子操作意为“不可被中断的一个或一系列操作”。
处理器如何实现原子操作?
处理器提供总线锁定和缓存锁定两个机制来保证复杂内存操作的原子性。
使用总线锁保证原子性
第一个机制是通过总线锁保证原子性。如果多个处理器同时对共享变量进行读改写操作 (i++就是经典的读改写操作),那么共享变量就会被多个处理器同时进行操作,这样读改写操作就不是原子的,操作完之后共享变量的值会和期望的不一致。
原因可能是多个处理器同时从各自的缓存中读取变量 i,分别进行加 1 操作,然后分别写入 系统内存中。那么,想要保证读改写共享变量的操作是原子的,就必须保证 CPU1 读改写共享变量的时候,CPU2 不能操作缓存了该共享变量内存地址的缓存。
处理器使用总线锁就是来解决这个问题的。所谓总线锁就是使用处理器提供的一个 LOCK#信号,当一个处理器在总线上输出此信号时,其他处理器的请求将被阻塞住,那么该处理器可以独占共享内存。
使用缓存锁保证原子性(锁的粒度变小了)
第二个机制是通过缓存锁定来保证原子性。在同一时刻,我们只需保证对某个内存地址的操作是原子性即可,但总线锁定把 CPU 和内存之间的通信锁住了,这使得锁定期间,其他处理器不能操作其他内存地址的数据,所以总线锁定的开销比较大,目前处理器在某些场合下使用缓存锁定代替总线锁定来进行优化。
频繁使用的内存会缓存在处理器的 L1、L2 和 L3 高速缓存里,那么原子操作就可以直接在处理器内部缓存中进行,并不需要声明总线锁,在 Pentium 6 和目前的处理器中可以使用“缓存 锁定”的方式来实现复杂的原子性。
所谓“缓存锁定”是指内存区域如果被缓存在处理器的缓存行中,并且在 Lock 操作期间被锁定,那么当它执行锁操作回写到内存时,处理器不在总线上声 言 LOCK#信号,而是修改内部的内存地址并允许它的缓存一致性机制来保证操作的原子性,因为缓存一致性机制会阻止同时修改由两个以上处理器缓存的内存区域数据,当其他处 理器回写已被锁定的缓存行的数据时,会使缓存行无效,i++例子中,当 CPU1 修 改缓存行中的 i 时使用了缓存锁定,那么 CPU2 就不能同时缓存 i 的缓存行。
但是有两种情况下处理器不会使用缓存锁定。
- 当操作的数据不能被缓存在处理器内部,或操作的数据跨多个缓存行 (cache line) 时,则处理器会调用总线锁定。
- 有些处理器不支持缓存锁定。对于 Intel 486 和 Pentium 处理器,就算锁定的内存区域在处理器的缓存行中也会调用总线锁定。
针对以上两个机制,我们通过 Intel 处理器提供了很多 Lock 前缀的指令来实现。例如,位测试和修改指令:BTS、BTR、BTC;交换指令 XADD、CMPXCHG,以及其他一些操作数和逻辑指令 (如 ADD、OR) 等,被这些指令操作的内存区域就会加锁,导致其他处理器不能同时访问它。
Java 如何实现原子操作
- CAS:保证原子性
- volatile:单个操作保证原子性,组合操作(例如:++操作符)不保证原子性
- synchronized:保证原子性
- Lock:保证原子性
原子性是否保证可见性?
原子性不一定保证可见性。比如 CAS 只解决了比较和更新的原子性的问题,要保证可见性,需要加锁或者是用 volatile 修饰变量。
volatile
定义:
java 编程语言允许线程访问共享变量,为了确保共享变量能被准确和一致的更新,线程应该确保通过排他锁单独获得这个变量。Java 语言提供了 volatile,在某些情况下比锁更加方便。如果一个字段被声明成 volatile,java 线程内存模型确保所有线程看到这个变量的值是一致的。
特性:
- 可见性 : 对一个 volatile 的变量的读,总是能看到任意线程对这个变量最后的写入.
- 单个读或者写具有原子性 : 对于单个 volatile 变量的读或者写具有原子性,复合操作不具有.(如 i++)
- 互斥性 : 同一时刻只允许一个线程对变量进行操作.(互斥锁的特点)
从 JSR-133 开始 (即从 JDK5 开始),volatile 变量的写-读可以实现线程之间的通信。从内存语义的角度来说:
- volatile 的写-读与锁的释放-获取有相同的内存效果:
- volatile 写和锁的释放有相同的内存语义;volatile 读与锁的获取有相同的内存语义。
写-读的内存语义
volatile 写的内存语义:当写一个 volatile 变量时,JMM 会把该线程对应的本地内存中的共享变量值刷新到主内存
volatile 读的内存语义:当读一个 volatile 变量时,JMM 会把该线程对应的本地内存置为无效。线程接下来将从主内存中读取共享变量。
内存语义的实现?
对于编译器来说,发现一个最优布置来最小化插入屏障的总数几乎不可能。为此,JMM 采取保守策略。下面是基于保守策略的 JMM 内存屏障插入策略:
- 在每个 volatile 写操作的前面插入一个 StoreStore 屏障。
- 在每个 volatile 写操作的后面插入一个 StoreLoad 屏障。
- 在每个 volatile 读操作的后面插入一个 LoadLoad 屏障。
- 在每个 volatile 读操作的后面插入一个 LoadStore 屏障。
上述内存屏障插入策略非常保守,但它可以保证在任意处理器平台,任意的程序中都能得到正确的 volatile 内存语义。
图中的 StoreStore 屏障可以保证在 volatile 写之前,其前面的所有普通写操作已经对任意处理器可见了。这是因为 StoreStore 屏障将保障上面所有的普通写在 volatile 写之前刷新到主内存。
volatile 写后面的 StoreLoad 屏障作用是避免 volatile 写与后面可能有的 volatile 读/写操作重排序。
因为编译器常常无法准确判断在一个 volatile 写的后面 是否需要插入一个 StoreLoad 屏障 (比如,一个 volatile 写之后方法立即 return)。
为了保证能正确实现 volatile 的内存语义,JMM 在采取了保守策略:在每个 volatile 写的后面,或者在每个 volatile 读的前面插入一个 StoreLoad 屏障。
从整体执行效率的角度考虑,JMM 最终选择了在每个 volatile 写的后面插入一个 StoreLoad 屏障。因为 volatile 写-读内存语义的常见使用模式是:一个写线程写 volatile 变量,多个读线程读同一个 volatile 变量。当读线程的数量大大超过写线程时,选择在 volatile 写之后插入 StoreLoad 屏障将带来可观的执行效率的提升。从这里可以看到 JMM 在实现上的一个特点:首先确保正确性,然后再去追求执行效率。
图中的 LoadLoad 屏障用来禁止处理器把上面的 volatile 读与下面的普通读重排序。LoadStore 屏障用来禁止处理器把上面的 volatile 读与下面的普通写重排序。
应用场景
只能在有限的一些情形下使用 volatile 变量替代锁。要使 volatile 变量提供理想的线程安全,必须同时满足下面两个条件:
- 对变量的写操作不依赖于当前值。(i++就依赖当前值)
- 该变量没有包含在具有其他变量的不变式中。
volatile 的性能
- 使用 volatile 变量的主要原因是其简易性:在某些情形下,使用 volatile 变量要比使用相应的锁简单得多。
- 使用 volatile 变量次要原因是其性能:某些情况下,volatile 变量同步机制的性能要优于锁。
很难做出准确、全面的评价,例如 “X 总是比 Y 快”,尤其是对 JVM 内在的操作而言。(例如,某些情况下 VM 也许能够完全删除锁机制,这使得我们难以抽象地比较 volatile 和 synchronized 的开销。)就是说,在目前大多数的处理器架构上,volatile 读操作开销非常低 —— 几乎和非 volatile 读操作一样。而 volatile 写操作的开销要比非 volatile 写操作多很多,因为要保证可见性需要实现内存界定(Memory Fence),即便如此,volatile 的总开销仍然要比锁获取低。
volatile 操作不会像锁一样造成阻塞,因此,在能够安全使用 volatile 的情况下,volatile 可以提供一些优于锁的可伸缩特性。如果读操作的次数要远远超过写操作,与锁相比,volatile 变量通常能够减少同步的性能开销。
正确使用 volatile 的模式
始终牢记使用 volatile 的限制:只有在状态真正独立于程序内其他内容时才能使用 volatile。
使用场景
1、状态标志
仅仅是使用一个布尔状态标志,用于指示发生了一个重要的一次性事件,例如完成初始化或请求停机。
volatile boolean shutdownRequested;
public void shutdown() { shutdownRequested = true; }
public void doWork() {
while (!shutdownRequested) {
// do stuff
}
}
2、双重检查锁定(double-checked-locking)
synchronized 实现原理
JVM 基于进入和退出 Monitor 对象来实现方法同步和代码块同步,但两者的实现细节不一样。
- 代码块同步:使用 monitorenter 和 monitorexit 指令实现的
- 方法同步:使用另外一种方式实现的,细节在 JVM 规范里并没有详细说明。但是,方法的同步同样可以使用这两个指令来实现。
JVM 要保证每个 monitorenter 必须有对应的 monitorexit 与之配对。
任何对象都有一个 monitor 与之关联,当且一个 monitor 被持有后,它将处于锁定状态。线程执行到 monitorenter 指令时,将会尝试获取对象所对应的 monitor 的所有权,即尝试获得对象的锁。
synchronized 的并发特性
- synchronized 保证原子性
- synchronized 保证可见性
- synchronized 保证有序性
- 可重入性
- 不可中断性
三种使用
- 对于普通同步方法,锁是当前实例化的对象。
- 对于静态同步方法,锁是当前类的 Class 对象。
- 对于同步方法块,锁是 synchronized 括号里配置的对象
Java 对象头
synchronized 用的锁是存在 Java 对象头里的。(1 字宽 等于 4 字节,即 32bit)
- 如果对象是数组类型,则虚拟机用 3 个字宽 (Word) 存储对象头
- 如果对象是非数组类型,则用 2 字宽存储对象头。在 32 位虚拟机中
Java 对象头里的 Mark Word 里默认存储对象的 HashCode、分代年龄和锁标记位。
在运行期间,Mark Word 里存储的数据会随着锁标志位的变化而变化。
锁的升级与对比
Java SE 1.6 为了减少获得锁和释放锁带来的性能消耗,引入了“偏向锁”和“轻量级锁”,在 Java SE 1.6 中,锁一共有 4 种状态,级别从低到高依次是:无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态,这几个状态会随着竞争情况逐渐升级。锁可以升级但不能降级,意味着偏向锁升级成轻量级锁后不能降级成偏向锁。这种锁升级却不能降级的策略,目的是为了提高获得锁和释放锁的效率。
无锁
无锁没有对资源进行锁定,所有的线程都能访问并修改同一个资源,但同时只有一个线程能修改成功。
1、无锁的特点:就是修改操作在循环内进行,线程会不断的尝试修改共享资源。如果没有冲突就修改成功并退出,否则就会继续循环尝试。如果有多个线程修改同一个值,必定会有一个线程能修改成功,而其他修改失败的线程会不断重试直到修改成功。
2、实现机制:CAS 原理及应用即是无锁的实现。无锁无法全面代替有锁,但无锁在某些场合下的性能是非常高的。
偏向锁
偏向锁是指一段同步代码一直被一个线程所访问,那么该线程会自动获取锁,降低获取锁的代价。
1、为什么引入:在大多数情况下,锁总是由同一线程多次获得,不存在多线程竞争,所以出现了偏向锁。其目标就是在只有一个线程执行同步代码块时能够提高性能。
2、实现机制:当一个线程访问同步代码块并获取锁时,会在 Mark Word 里存储锁偏向的线程 ID。在线程进入和退出同步块时不再通过 CAS 操作来加锁和解锁,而是检测 Mark Word 里是否存储着指向当前线程的偏向锁。
引入偏向锁是为了在无多线程竞争的情况下尽量减少不必要的轻量级锁执行路径,因为轻量级锁的获取及释放依赖多次 CAS 原子指令,而偏向锁只需要在置换 ThreadID 的时候依赖一次 CAS 原子指令即可。
3、偏向锁的撤销:偏向锁只有遇到其他线程尝试竞争偏向锁时,持有偏向锁的线程才会释放锁,线程不会主动释放偏向锁。偏向锁的撤销需要等待全局安全点(在这个时间点上没有字节码正在执行),它会首先暂停拥有偏向锁的线程,判断锁对象是否处于被锁定状态。撤销偏向锁后恢复到无锁(标志位为“01”)或轻量级锁(标志位为“00”)的状态。
4.关闭偏向锁:偏向锁在 JDK 6 及以后的 JVM 里是默认启用的。可以通过 JVM 参数关闭偏向锁:-XX:-UseBiasedLocking=false,关闭之后程序默认会进入轻量级锁状态。
偏向锁的初始化流程
轻量级锁
轻量级锁是指当锁是偏向锁的时候,被另外的线程所访问,偏向锁就会升级为轻量级锁,其他线程会通过自旋的形式尝试获取锁,不会阻塞,从而提高性能。
解锁:如果轻量级锁的更新操作失败了,虚拟机首先会检查对象的 Mark Word 是否指向当前线程的栈帧,如果是就说明当前线程已经拥有了这个对象的锁,那就可以直接进入同步块继续执行,否则说明多个线程竞争锁。
若当前只有一个等待线程,则该线程通过自旋进行等待。但是当自旋超过一定的次数,或者一个线程在持有锁,一个在自旋,又有第三个来访时,轻量级锁升级为重量级锁。
重量级锁
升级为重量级锁时,锁标志的状态值变为“10”,此时 Mark Word 中存储的是指向重量级锁的指针,此时等待锁的线程都会进入阻塞状态。
偏向锁通过对比 Mark Word 解决加锁问题,避免执行 CAS 操作。而轻量级锁是通过用 CAS 操作和自旋来解决加锁问题,避免线程阻塞和唤醒而影响性能。重量级锁是将除了拥有锁的线程以外的线程都阻塞。
什么是锁
随着集成电路越来越发达,多计算核心的机器大行其道,为了解决多个并行执行分支对某一块资源的同步访问,操作系统层面提供了互斥信号量的概念。
在几乎所有的支持多线程编程模型的语言中,基本上都提供了与互斥信号量对应的概念,在 Java 中我们称之为锁。
锁的内存语义分析
下面将借助 ReentrantLock 的源代码,来分析锁内存语义的具体实现机制。
public class ReentrantLockExample {
int value = 0;
private final ReentrantLock lock = new ReentrantLock();
public void writer() {
lock.lock(); // 加锁
try {
value++;
} finally {
lock.unlock(); // 解锁
}
}
public void reader() {
lock.lock(); // 加锁
try {
int tmp = value;
// do something ...
System.out.println(tmp);
} finally {
lock.unlock(); // 解锁
}
}
}
在 ReentrantLock 中,调用 lock() 方法获取锁;调用 unlock() 方法释放锁。
ReentrantLock 的实现依赖于 Java 同步器框架 AbstractQueuedSynchronizer(本文简称之为 AQS)。
AQS 使用一个整型的 volatile 变量 (命名为 state) 来维护同步状态。
ReentrantLock 内部实现了公平锁(FairSync)和非公平锁(NonfairSync)。
公平锁和非公平锁加锁逻辑存在差异,公平锁按线程排队优先级获取锁,非公平自然竞争。解锁逻辑完全一样。
公平锁-加锁分析
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);
}
// 公平锁-加锁最终执行方法
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState(); // 获取锁的开始,首先读 volatile 变量 state
if (c == 0) { // 可以竞争
if (!hasQueuedPredecessors() && // 是否排在线程队列头节点(公平)
compareAndSetState(0, acquires)) { // CAS 方式修改 state
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) { // 当前线程重入
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
非公平锁-加锁分析
static final class NonfairSync extends Sync {
final void lock() {
if (compareAndSetState(0, 1)) // 无竞争情况直接获取锁
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1); // 存在竞争使用 nonfairTryAcquire 竞争锁资源
}
// nonfairTryAcquire 实现中无需判断 hasQueuedPredecessors 线程优先级
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}
// AbstractQueuedSynchronizer 中提供 CAS 操作
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
公平锁非公平锁-解锁分析
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 是否完全释放(重入锁,即多次加锁时需多次释放)
free = true;
setExclusiveOwnerThread(null);
}
setState(c); // 释放锁的最后,写 volatile 变量 state
return free;
}
释放锁的最后写 volatile 变量 state,在获取锁时首先读这个 volatile 变量。
根据 volatile 的 happens-before 规则,释放锁的线程在写 volatile 变量之前可见的共享变量,在获取锁的线程读取同一个 volatile 变量后将立即变得对获取锁的线程可见
锁内存语义总结
由于 Java 的 CAS 同时具有 volatile 读和 volatile 写的内存语义,因此 Java 线程之间的通信现在有了下面 4 种方式。
- A 线程写 volatile 变量,随后 B 线程读这个 volatile 变量。
- A 线程写 volatile 变量,随后 B 线程用 CAS 更新这个 volatile 变量。
- A 线程用 CAS 更新一个 volatile 变量,随后 B 线程用 CAS 更新这个 volatile 变量。
- A 线程用 CAS 更新一个 volatile 变量,随后 B 线程读这个 volatile 变量。
锁的通用化的实现模式:
- 声明共享变量为 volatile。
- 使用 CAS 的原子条件更新来实现线程之间的同步。
- 配合以 volatile 的读/写和 CAS 所具有的 volatile 读和写的内存语义来实现线程之间的通信。
concurrent 包实现
volatile 变量的读/写和 CAS 可以实现线程之间的通信。把这些特性整合在一起,就形成了整个 concurrent 包得以实现的基石。
AQS,非阻塞数据结构和原子变量类 (java.util.concurrent.atomic 包中的类),这些 concurrent 包中的基础类都是使用这种模式来实现的,而 concurrent 包中的高层类又是依赖于这些基础类来实现的。
原子操作类说明
当程序更新一个变量时,如果是多线程同时更新这个变量,可能得到的结果与期望值不同。我们可以用 并发关键字-volatile、并发关键字-synchronized、Lock 来解决并发读写问题。
但是从性能及语义上可能存在以下问题:
- volatile 不能保证组合操作的原子性(比如自增操作)
- synchronized 和 Lock 比较重量级
从 java1.5 开始,jdk 提供了 java.util.concurrent.atomic 包,这个包中的原子操作类,提供了一种用法简单,性能高效,线程安全的更新一个变量的方式。
atomic 包里提供原子更新类型分别是:原子更新基本类型,原子更新数组,原子更新引用,原子更新属性,这些类都是使用 Unsafe 实现的包装类。
原子更新基本类型
Atomic 包提供了以下 3 个类:
- AtomicBoolean:原子更新布尔类型。
- AtomicInteger:原子更新整型。
- AtomicLong:原子更新长整型。
常用方法解释
以上 3 个类提供的方法几乎一模一样,本节仅以 AtomicInteger 为例进行讲解。AtomicInteger 的常用方法如下:
- set : 设置为新值
- get : 获取当前的值
- getAndAdd : 设置为新值,返回旧值
- getAndIncrement : 设置为新值 (+1 操作),返回旧值
- getAndDecrement : 设置为新值 (-1 操作),返回旧值
- incrementAndGet : 设置为新值 (+1 操作),返回新值
- decrementAndGet : 设置为新值 (-1 操作),返回新值
- addAndGet : 设置为新值,返回新值
- lazySet : 最终会设置成新值,使用 lazySet 设置值后,可能导致其他 线程在之后的一小段时间内还是可以读到旧的值。参考
- compareAndSet : 如果等于预期的值则设置为新值,返回是否设置成功
原子更新数组
原子的方式更新数组里的某个元素,Atomic 包提供了以下 3 个类:
- AtomicIntegerArray:原子更新整型数组里的元素。
- AtomicLongArray:原子更新长整型数组里的元素。
- AtomicReferenceArray:原子更新引用类型数组里的元素
常用方法解释
以上 3 个类提供的方法几乎一模一样,AtomicIntegerArray 与 AtomicInteger 常用方法基本一致,不同之处在于基本类型类操作的是单个变量,数据类型操作的是某个下标对应的变量。
因此在 AtomicInteger 方法的基础上增加了一个指定下标的参数,内部方法处理逻辑一致,举例说明:
int[] arr = new int[]{1,2,3,4};
final AtomicIntegerArray array = new AtomicIntegerArray(arr);
System.out.printf(""+array.incrementAndGet(0)+"_"+arr[0]+"_"+array.length());
//打印 2_1_4
原子更新引用类型
Atomic 包提供了以下 3 个类:
- AtomicReference:原子更新引用类型。
- AtomicReferenceFieldUpdater:原子更新引用类型里的字段。
- AtomicMarkableReference:原子更新带有标记位的引用类型。可以原子更新一个布尔类型的标记位和引用类型。
支持的方法与 AtomicInteger 处理逻辑一致,举例说明:
@Slf4j
public class AtomicRefExample {
public static void main(String[] args) {
final AtomicReference<User> reference = new AtomicReference<>(new User("liw", 28));
final User user = reference.getAndSet(new User("Li.W", 28));
log.info("user.getName() = {}", user.getName()); // liw
log.info("reference.get().getName() = {}", reference.get().getName()); // Li.W
final AtomicMarkableReference<User> markableReference =
new AtomicMarkableReference<>(user, false);
markableReference.compareAndSet(user, user, false, true);
log.info("markableReference {},{}", markableReference.getReference(), markableReference.isMarked()); // true
}
@AllArgsConstructor
@Getter
private static class User {
private String name;
private int age;
}
}
原子更新类属性
Atomic 包提供了以下 3 个类进行原子字段更新:
- AtomicIntegerFieldUpdater:原子更新整型的字段的更新器。
- AtomicLongFieldUpdater:原子更新长整型字段的更新器。
- AtomicStampedReference:原子更新带有版本号的引用类型。该类将整数值与引用关联起来,可用于原子的更新数据和数据的版本号,可以解决使用 CAS 进行原子更新时可能出现的 ABA 问题
要想原子地更新字段类需要两步。
1、因为原子更新字段类都是抽象类,每次使用的时候必须使用静态方法 newUpdater 创建一个更新器,并且需要设置想要更新的类和属性。
2、更新类的字段(属性)必须使用 public volatile 修饰符。
例如:
@Slf4j
public class AtomicIntegerFieldUpdaterExample {
public static void main(String[] args) {
final AtomicIntegerFieldUpdater<User> updater =
AtomicIntegerFieldUpdater.newUpdater(User.class, "age");
final User user = new User("Li", 28);
updater.set(user, 30);
int increment = updater.incrementAndGet(user);
log.info("increment return = {}", increment); // 31
}
@AllArgsConstructor
@Getter
private static class User {
private String name;
public volatile int age;
}
}
高性能原子类
高性能原子类,是 java1.8 中增加的原子类,它们使用分段的思想,把不同的线程 hash 到不同的段上去更新,最后再把这些段的值相加得到最终的值,这些类主要有:
Striped64 下面四个类的子类:
- LongAccumulator: long 类型的聚合器,需要传入一个 long 类型的二元操作,可以用来计算各种聚合操作,包括加乘等。
- LongAdder: long 类型的累加器,LongAccumulator 的特例,只能用来计算加法,且从 0 开始计算。
- DoubleAccumulator: double 类型的聚合器,需要传入一个 double 类型的二元操作,可以用来计算各种聚合操作,包括加乘等。
- DoubleAdder: double 类型的累加器,DoubleAccumulator 的特例,只能用来计算加法,且从 0 开始计算。
这几个类的操作基本类似,其中 DoubleAccumulator 和 DoubleAdder 底层其实也是用 long 来实现的,基本用法如下:
@Slf4j
public class Striped64Example {
public static void main(String[] args) {
final LongAdder longAdder = new LongAdder();
longAdder.increment();
longAdder.add(666);
System.out.println(longAdder.sum());
log.info("longAdder.sum() = {}", longAdder.sum()); // 667
final LongAccumulator longAccumulator =
new LongAccumulator((left, right) -> left + right * 2, 666);
longAccumulator.accumulate(1);
longAccumulator.accumulate(3);
longAccumulator.accumulate(-4);
log.info("longAccumulator.get() = {}", longAccumulator.get()); // 666
}
}
二、什么是线程
什么是进程:现代操作系统在运行一个程序时,会为其创建一个进程。例如,启动一个 Java 程序,操作系统就会创建一个 Java 进程。现代操作系统调度的最小单元是线程,也叫轻量级进程,在一个进程里可以创建多个线程,这些线程都拥有各自的计数器、堆栈和局部变量等属性,并且能够访问共享的内存变量。
为什么要用线程
- 更多的处理器核心(利用处理器高频多核心优势)
- 更快的响应时间(并行计算)
- 更好的编程模型(方便开发人员编程)
线程优先级
现代操作系统基本采用时分的形式调度运行的线程,操作系统会分出一个个时间片,线程会分配到若干时间片,当线程的时间片用完了就会发生线程调度,并等待着下次分配。线程分配到的时间片多少也就决定了线程使用处理器资源的多少,而线程优先级就是决定线程需要多或者少分配一些处理器资源的线程属性。
如何控制优先级?
通过 setPriority(int) 方法来修改优先级,默认优先级是 5(范围从 1~10),优先级高的线程分配时间片的数量要多于优先级低的线程。
设置线程优先级时考虑情况:
- 频繁阻塞 (休眠或者 I/O 操作) 的线程需要设置较高优先级
- 偏重计算 (需要较多 CPU 时间或者偏运算) 的线程则设置较低的优先级,确保处理器不会被独占
线程状态
1、初始状态(NEW)
尚未启动的线程处于此状态。
2、就绪状态(除CPU之外,其它的运行所需资源都已全部获得)
- 调用线程的 start 方法,此线程进入就绪状态。
- 当前线程 sleep 方法结束,其他线程 join 结束,等待用户输入完毕,某个线程拿到对象锁,这些线程也将进入就绪状态。
- 当前线程时间片用完了,调用当前线程的 yield 方法,当前线程进入就绪状态。
- 锁池里的线程拿到对象锁后,进入就绪状态。
3、运行中状态(RUNNABLE)
线程调度程序从可运行池中选择一个线程作为当前线程时,当前线程线程所处的状态就是运行状态。(JDK 把就绪状态归于 RUNNABLE 状态)
4、阻塞状态(BLOCKED)
阻塞状态是线程阻塞在等待监视器的状态。
5、等待(WAITING)
处于这种状态的线程不会被分配 CPU 执行时间,它们要等待被显式地做出一些动作(通知或者中断),否则会处于无限期等待的状态。
6、超时等待(TIMED_WAITING)
处于这种状态的线程不会被分配 CPU 执行时间,不过无须无限期等待被其他线程显示地唤醒,在达到一定时间后它们会自动唤醒。
7、终止状态(TERMINATED)
当线程的 run 方法完成时,或者主线程的 main 方法完成时,我们就认为它终止了。这个线程对象也许是活的,但是,它已经不是一个单独执行的线程。线程一旦终止了,就不能复生。
在一个终止的线程上调用 start 方法,会抛出 java.lang.IllegalThreadStateException 异常。
线程的初始化、启动、中断、停止
初始化线程(init)
线程类 Thread 构造函数最终调用 init 方法如下:
private void init(ThreadGroup g, Runnable target, String name,
long stackSize, AccessControlContext acc,
boolean inheritThreadLocals) {
if (name == null) {
throw new NullPointerException("name cannot be null");
}
this.name = name; // 线程名称
Thread parent = currentThread(); // 当前线程就是该线程的父线程
g.addUnstarted();
this.group = g; // 设置所属线程组
this.daemon = parent.isDaemon(); // 将 daemon 属性设置为父线程的对应属性
this.priority = parent.getPriority(); // 将 priority 属性设置为父线程的对应属性
if (security == null || isCCLOverridden(parent.getClass()))
this.contextClassLoader = parent.getContextClassLoader();
else
this.contextClassLoader = parent.contextClassLoader;
this.inheritedAccessControlContext =
acc != null ? acc : AccessController.getContext();
this.target = target;
setPriority(priority);
if (inheritThreadLocals && parent.inheritableThreadLocals != null)
this.inheritableThreadLocals =
ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);
this.stackSize = stackSize; // 存放指定的堆栈大小,零表示该参数将被忽略
tid = nextThreadID(); // 分配一个线程 ID
}
启动线程(start)
线程对象在初始化完成之后,调用 start 方法就可以启动这个线程。
线程 start 方法的含义是:当前线程 (即 parent 线程) 同步告知 Java 虚拟机,只要线程规划器空闲,应立即启动调用 start 方法的线程
public class Thread implements Runnable
Runnable 接口中只有一个 run 方法。一个类实现 Runnable 接口后,并不代表该类是一个“线程”类,不能直接运行,必须通过 Thread 实例才能创建并运行线程。
Thread.start 方法源码:
public synchronized void start() {
if (threadStatus != 0)
throw new IllegalThreadStateException();
group.add(this);
boolean started = false;
try {
start0();
started = true;
} finally {
try {
if (!started) {
group.threadStartFailed(this);
}
} catch (Throwable ignore) {
/* do nothing. If start0 threw a Throwable then
it will be passed up the call stack */
}
}
}
private native void start0(); // 最终底层调用启动线程
过期的 suspend、resume 和 stop
suspend、resume 和 stop 方法分别致使线程暂停、恢复和终止工作
不建议使用的原因主要有:
- suspend/resume 方法,在调用后,线程不会释放已经占有的资源 (比如锁),而是占有着资源进入睡眠状态,这样容易引发死锁问题。
- stop 方法,在终结一个线程时不会保证线程的资源正常释放,通常是没有给予线程完成资源释放工作的机会,因此会导致程序可能工作在不确定状态下。
线程 join
如果一个线程 A 执行了 thread.join 语句,其含义是:当前线程 A 等待 thread 线程终止之后才从 thread.join 返回。
线程 Thread 除了提供 join 方法之外,还提供了 join(long millis) 和 join(long millis,int nanos) 两个具备超时特性的方法。这两个超时方法表示,如果线程 thread 在给定的超时时间里没有终止,那么将会从该超时方法中返回。
join 方法使用示例
模拟了 3 个线程,每个线程中引用了前一个线程(previous)。
在 run 方法中执行previous.join() 表示 previous 线程执行完成后才可以执行 previous.join() 后的逻辑。
public static void main(String[] args) throws InterruptedException {
Thread previous = Thread.currentThread();
for (int i = 0; i < 3; i++) {
// 每个线程拥有前一个线程的引用,需要等待前一个线程终止,才能从等待中返回
final Thread thread = new Thread(new Domino(previous), "t-" + i);
thread.start();
previous = thread;
}
TimeUnit.SECONDS.sleep(5);
System.out.println(Thread.currentThread().getName() + " terminate.");
}
static class Domino implements Runnable {
private Thread previous; // 当前线程的前一个线程引用
public Domino(Thread previous) {
this.previous = previous;
}
@Override
public void run() {
final String name = Thread.currentThread().getName();
try {
System.out.println(name + " before .join()"); // join 方法调用前逻辑
previous.join();
System.out.println(name + " after .join()"); // join 方法调后前逻辑
} catch (InterruptedException e) {
}
System.out.println(name + " terminate.");
}
}
/*
最终打印
t-0 before .join()
t-2 before .join()
t-1 before .join()
main terminate.
t-0 after .join()
t-0 terminate.
t-1 after .join()
t-1 terminate.
t-2 after .join()
t-2 terminate.
*/
什么时候用 join?
线程之间存在一种等待关系时使用。比如 A 线程必须等 B 线程执行后才可以执行 join 后逻辑。
join 怎么实现的?
参考 Thread.join 源码分析可知实现使用了 wait 机制,即 《线程等待通知机制》
public final synchronized void join(long millis)
throws InterruptedException {
long base = System.currentTimeMillis();
long now = 0;
if (millis < 0) {
throw new IllegalArgumentException("timeout value is negative");
}
//如果没有传入millis,直接调用的是Thread.join();那就会等Thread线程执行完才唤醒
if (millis == 0) {
while (isAlive()) {
wait(0);
}
} else {
//如果传入了超时时间,那么就只会在规定的时间内wait()
while (isAlive()) {
long delay = millis - now;
if (delay <= 0) {
break;
}
wait(delay);
now = System.currentTimeMillis() - base;
}
}
}
线程等待通知机制(wait、notify)
线程等待/通知机制简介
假设有两个线程,一个生产数据,一个消费数据。如何保证生产线程生产出数据后通知给消费线程,消费线程在没有数据可被消费情况下等待有数据呢?
- 一般的做法是采用轮询方法,一直 while 循环(中间睡眠几毫秒)判断是否有数据。该办法可能存在的问题是 难以确保及时性。在睡眠时,基本不消耗处理器资源,但是如果睡得过久,就不能及时发现条件已经变化,也就是及时性难以保证。
- 难以降低开销。如果降低睡眠的时间,比如休眠 1 毫秒,这样消费者能更加迅速地发现条件变化,但是却可能消耗更多的处理器资源,造成了无端的浪费。
Java 通过内置的等待/通知机制能够很好地解决这个矛盾并实现所需的功能。
等待/通知机制,是指一个线程 A 调用了对象 O 的 wait 方法进入等待状态,而另一个线程 B 调用了对象 O 的 notify 或者 notifyAll 方法,线程 A 收到通知后从对象 O 的 wait 方法返回,进而执行后续操作。上述两个线程通过对象 O 来完成交互,而对象上的 wait 和 notify/notifyAll 的 关系就如同开关信号一样,用来完成等待方和通知方之间的交互工作。
怎么实现线程等待/通知机制
Object 作为所有对象的父类,其中与等待通知机制相关几个方法如下:
- wait : 调用该方法线程进入 WAITING 状态,只有等待其他线程的通知或者被中断才会返回(调用后会释放锁,sleep 不会)
- wait(超时设置) : 在 wait 方法的基础上增加了超时,达到超时设置后如果没有通知或者中断也会返回
- notify : 通知一个在对象上等待的线程 A(调用过 wait 方法的线程),使其从 wait 方法返回,前提是该线程 A 获取到了对象锁。(多线程存在锁竞争)
- notifyAll : 通知所有等待在该对象上的线程
例子中,创建了两个线程-WaitThread 和 NotifyThread,前者检查 flag 值是否为 false,如果符合要求,进行后续操作,否则在 lock 上等待,后者在睡眠了一段时间 后对 lock 进行通知。
public class WaitAndNotifyExample {
public static boolean FLAG = true;
public static final Object LOCK = new Object();
public static void main(String[] args) {
new Thread(new Wait(), "[WaitThread]").start();
try {
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException ignored) {
}
new Thread(new Notify(), "[NotifyThread]").start();
}
}
class Wait implements Runnable {
@Override
public void run() {
synchronized (LOCK) {
final String threadName = Thread.currentThread().getName();
while (FLAG) {
System.out.println(new Date() + threadName + " FLAG = true , wait...");
try {
LOCK.wait();
} catch (InterruptedException ignored) {
}
}
System.out.println(new Date() + threadName + " FLAG = false,开始继续工作");
}
}
}
class Notify implements Runnable {
@Override
public void run() {
synchronized (LOCK) {
final String threadName = Thread.currentThread().getName();
System.out.println(new Date() + threadName + " 持有锁,发出通知");
LOCK.notifyAll();
FLAG = false;
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException ignored) {
}
synchronized (LOCK) { // 再次加锁,目的:测试调用 notifyAll 方法后被唤醒的线程是否立即执行
System.out.println(new Date() + threadName + " 再次拿到锁. sleep @ ");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException ignored) {
}
}
}
}
}
执行细节说明:
- 使用 wait、notify 和 notifyAll 时需要先对调用对象加锁。
- 调用 wait 方法后,线程状态由 RUNNING 变为 WAITING,并将当前线程放置到对象的等待队列。
- notify 或 notifyAll 方法调用后,等待线程依旧不会从 wait() 返回,需要调用 notify 或 notifyAll 的线程释放锁之后,等待线程才有机会从 wait 返回。
- notify 方法将等待队列中的一个等待线程从等待队列中移到同步队列中,而 notifyAll 方法则是将等待队列中所有的线程全部移到同步队列,被移动的线程状态由 WAITING 变为 BLOCKED。
- 从 wait 方法返回的前提是获得了调用对象的锁。
线程等待/通知的经典范式
等待方遵循原则
- 获取对象的锁
- 如果条件不满足执行处理逻辑,那么调用对象的 wait 方法(被通知后仍要检查条件)。如果条件满足执行处理逻辑。
- 条件满足则执行对应的逻辑
伪代码:
synchronized(对象) {
while(条件判断) { // 循环判断条件是否满足,条件不满足时进入等待状态
对象.wait(); // wait 后释放锁,其他线程拿到锁后执行对于逻辑
}
对应的处理逻辑 // 其他线程调用 notify、notifyAll 后并释放锁后,继续运行该处代码
}
通知方遵循原则
- 获得对象的锁
- 改变条件(e.g. flag)
- 通知所有等待在对象上的线程
伪代码:
synchronized(对象) {
改变条件
对象.notifyAll();
}
为什么wait/notify需要在同步块里执行?
参考上面的经典范式,如果没有在同步块里:
- 等待方条件判断不符合时将执行 wait 方法
- 在执行 wait 方法前通知方刚好改变了条件并执行 notifyAll 方法
- 然后等待方执行了 wait 方法(可能永远不会被唤醒了,本来应该被唤醒的)
总结为:用 synchronized 确保在条件判断和 notify 之间不要调用 wait。保证线程的通信交流。
应用场景
多线程执行时,线程内部逻辑需要等待其他线程执行后满足条件才执行 wait 方法后续的逻辑
tip:Thread.join 是等待指定的一个线程执行完成后才执行后续的逻辑,wait 是未指定具体线程,可任意线程唤醒。
ThreadLocal 是什么
ThreadLocal 提供了线程内存储变量的能力,这些变量不同之处在于每一个线程读取的变量是对应的、互相独立的。通过 get 和 set 方法就可以得到当前线程对应的值。
ThreadLocal 怎么用
- 通过 set(T) 方法来设置一个值
- 在当前线程下再通过 get() 方法获取到原先设置的值
ThreadLocal 怎么实现的
ThreadLocal 表示每个线程存储的数据类型 T ,每个线程在一个 ThreadLocal 实例上只能存储一个值。
ThreadLocal set/get 方法源码解析:
public class ThreadLocal<T> {
public void set(T value) {
Thread t = Thread.currentThread(); // 获取当前线程
ThreadLocalMap map = getMap(t); // 获取当前线程内部维护的 threadLocals
if (map != null) { // 如果为空进行初始化,不为空进行设值
map.set(this, value);
} else {
createMap(t, value);
}
}
ThreadLocalMap getMap(Thread t) { // 获取当前线程内部维护的 threadLocals
return t.threadLocals;
}
void createMap(Thread t, T firstValue) { // 初始化线程内部维护的 threadLocals
t.threadLocals = new ThreadLocalMap(this, firstValue);
}
public T get() {
Thread t = Thread.currentThread(); // 获取当前线程
ThreadLocalMap map = getMap(t);
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this); // 不为空时获取对应 value
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;
}
}
return setInitialValue(); // 为空时获取初始化的 value 返回,初始化 value 默认为 null
}
}
ThreadLocalMap set/getEntry 方法源码解析:
private void set(ThreadLocal<?> key, Object value) {
Entry[] tab = table;
int len = tab.length;
int i = key.threadLocalHashCode & (len-1); //获取索引值
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) { // 遍历 tab 如果已经存在则更新值
ThreadLocal<?> k = e.get();
if (k == key) {
e.value = value;
return;
}
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}
tab[i] = new Entry(key, value); // 如果上面没有遍历成功则创建新值
int sz = ++size;
if (!cleanSomeSlots(i, sz) && sz >= threshold) // 满足条件数组扩容 *2
rehash();
}
private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1); //获取索引值
Entry e = table[i];
if (e != null && e.get() == key)
return e;
else
return getEntryAfterMiss(key, i, e);
}
为什么通过 int i = key.threadLocalHashCode & (len-1); 获取索引值?
public class ThreadLocal<T> {
private final int threadLocalHashCode = nextHashCode();
private static AtomicInteger nextHashCode =
new AtomicInteger();
private static final int HASH_INCREMENT = 0x61c88647;
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}
}
因为 static 的原因,在每次 new ThreadLocal 时因为 threadLocalHashCode 的初始化,会使 threadLocalHashCode 值自增一次,增量为 0x61c88647。
0x61c88647 是斐波那契散列乘数,它的优点是通过它散列 (hash) 出来的结果分布会比较均匀,可以很大程度上避免 hash 冲突。
应用场景
- 某些数据是以线程为作用域并且不同线程具有不同的数据副本的时候,就可以考虑采用 ThreadLocal
- 线程之间的通信
- AOP 切面编程,记录线程执行耗时情况
线程池是什么
线程池就是提前创建若干个线程,如果有任务需要处理,线程池里的线程就会处理任务,处理完之后线程并不会被销毁,而是等待下一个任务。
由于创建和销毁线程都是消耗系统资源的,所以当你想要频繁的创建和销毁线程的时候就可以考虑使用线程池来提升系统的性能。
为什么用线程池
- 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
- 提高响应速度。当任务到达时,任务可以不需要等到线程创建就能立即执行。
- 提高线程的可管理性。线程是稀缺资源,如果无限制地创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一分配、调优和监控。
线程池实现原理
当提交一个新任务到线程池时,线程池的处理流程如下:
1、线程池判断核心线程池里的线程是否都在执行任务。
如果不是,则创建一个新的工作线程来执行任务。
如果是,则进入下个流程。
2、线程池判断工作队列是否已经满。
如果工作队列没有满,则将新提交的任务存储在这个工作队列里。
如果工作队列满了,则进入下个流程。
3、线程池判断线程池的线程是否都处于工作状态。
如果没有,则创建一个新的工作线程来执行任务。
如果已经满了,则交给饱和策略来处理这个任务。
ThreadPoolExecutor 执行 execute 方法分下面 4 种情况。
- 1、如果当前运行的线程少于 corePoolSize(核心线程数量最大值),则创建新线程来执行任务 (注意,执行这一步骤需要获取全局锁 mainLock )。
- 2、如果运行的线程等于或多于 corePoolSize,则将任务加入 BlockingQueue。
- 3、如果无法将任务加入 BlockingQueue(队列已满),则创建新的线程来处理任务 (注意,执行这一步骤需要获取全局锁)。
- 4、如果创建新线程将使当前运行的线程超出 maximumPoolSize,任务将被拒绝,并调用 RejectedExecutionHandler.rejectedExecution() 方法。
ThreadPoolExecutor 采取上述步骤的总体设计思路,是为了在执行 execute 方法时,尽可地避免获取全局锁。在 ThreadPoolExecutor 完成预热之后 (当前运行的线程数大于等于 corePoolSize),几乎所有的 execute 方法调用都是执行步骤 2,而步骤 2 不需要获取全局锁。
ThreadPoolExecutor.execute 提交任务源码分析:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 运行的线程少于 corePoolSize 时则创建线程并执行当前任务
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
// 如线程数大于等于基本线程数或线程创建失败,则将当前任务放到工作队列中。
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
// 重新检查是否满足条件(可能上次检查后有线程完成)
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false)) // 如果无法将任务排队,则尝试添加新的线程
reject(command); // 添加失败,抛出 RejectedExecutionException 异常
}
工作线程运行机制
线程池创建线程时,会将线程封装成工作线程 Worker,Worker 在执行完任务后,还会循环获取工作队列里的任务来执行。
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // 允许中断
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) { // 循环获取任务
w.lock(); // 为什么加锁?参考文章下面解释
// 中断处理:如果池正在停止,或者运行 worker 被中断
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
try {
task.run(); // 运行任务逻辑
afterExecute(task, null);
} catch (Throwable ex) {
afterExecute(task, ex);
throw ex;
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
}
Worker 为什么在执行任务时候加锁?
- Worker 抑制线程真正开始运行任务之前的中断,将 lock 状态初始化为负值,并在启动时将其清除(在 runWorker 方法中)
- 锁被设计为非重入互斥锁,避免调用线程池方法 setCorePoolSize 时也能获取锁,保证等待工作的 Worker 是一个可以执行任务的,不是一个将要中断的 Worker
- 锁的控制保护了正在执行 task 的 Worker 不能被其他线程中断(必须竞争锁后才能执行中断)
创建线程池
- corePoolSize:核心线程数。当提交一个任务到线程池时,线程池会创建一个线程来执行任务,即使其他空闲的基本线程能够执行新任务也会创建线程,等到需要执行的任务数大于线程池基本大小时就不再创建。
- keepAliveTime:当线程数大于核心线程数量时,多余的线程多久后回收
- maximumPoolSize:线程池允许创建的最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。
- BlockingQueue:用于保存等待执行的任务的阻塞队列 BlockingQueue。可以选择以下几个阻塞队列:
- ArrayBlockingQueue: 是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出) 原则对元素进行排序。
- DelayQueue: 一个无界阻塞队列,只有在延迟期满时,才能从中提取元素,队列的头部,是延迟期满后保存时间最长的 delay 元素
- LinkedBlockingQueue: 一个基于链表结构的阻塞队列,此队列按 FIFO 排序元素,吞吐量通常要高于 ArrayBlockingQueue。
- LinkedBlockingDeque: 一个由链表结构组成的双向阻塞队列。
- LinkedTransferQueue: 一个由链表结构组成的无界阻塞队列。
- SynchronousQueue: 一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,否则插入操作一直处于阻塞状态,吞吐量通常要高于 Linked-BlockingQueue。
- PriorityBlockingQueue: 一个具有优先级的无限阻塞队列。
- ThreadFactory:创建线程的线程工厂。
- RejectedExecutionHandler:当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。这个策略默认情况下是 AbortPolicy,表示无法处理新任务时抛出异常。在 JDK 1.5 中 Java 线程池框架提供了以下 4 种策略。
- AbortPolicy:直接抛出异常。
- CallerRunsPolicy:只用调用者所在线程来运行任务。
- DiscardOldestPolicy:丢弃队列里最近的一个任务,并执行当前任务。
- DiscardPolicy:不处理,丢弃掉。
提交任务到线程池
可以使用两个方法向线程池提交任务,分别为 execute 和 submit 方法。
- execute 方法提交任务
execute 方法用于提交不需要返回值的任务。所以无法判断任务是否被线程池执行成功。
- submit 方法提交任务
submit 方法用于提交需要返回值的任务。
线程池会返回一个 Future 类型的对象,通过这个 Future 对象可以判断任务是否执行成功,并且可以通过 Future 的 get 方法来获取返回值,get 方法会阻塞当前线程直到任务完成,而使用 get(long timeout,TimeUnit unit) 方法则会阻塞当前线程一段时间后立即返回,这时候有可能任务没有执行完。
关闭线程池
通过调用线程池的 shutdown 或 shutdownNow 方法来关闭线程池。
遍历线程池中的工作线程,然后逐个调用线程的 interrupt 方法来中断线程,所以无法响应中断的任务可能永远无法终止。
但是它们存在一定的区别:
- shutdownNow 首先将线程池的状态设置成 STOP,然后尝试停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表。
- shutdown 只是将线程池的状态设置成 SHUTDOWN 状态,然后中断所有没有正在执行任务的线程。
- isShutdown: 只要调用了上面两个关闭方法中的任意一个,isShutdown 方法就会返回 true。
- isTerminated: 当所有的任务都已关闭后,才表示线程池关闭成功,这时调用 isTerminated 方法会返回 true。
合理地配置线程池
要想合理地配置线程池,就必须首先分析任务特性,可以从以下几个角度来分析:
任务的性质:CPU 密集型任务、IO 密集型任务和混合型任务。
任务的优先级:高、中和低。
任务的执行时间:长、中和短。
任务的依赖性:是否依赖其他系统资源,如数据库连接。
性质不同的任务可以用不同规模的线程池分开处理。
CPU 密集型任务应配置尽可能小的线程,如配置 cpu+1 个线程的线程池。由于 IO 密集型任务线程并不是一直在执行任务,则应配置尽可能多的线程,如 2*cpu。
混合型的任务,如果可以拆分,将其拆分成一个 CPU 密集型任务和一个 IO 密集型任务,只要这两个任务执行的时间相差不是太大,那么分解后执行的吞吐量将高于串行执行的吞吐量。
如果这两个任务执行时间相差太大,则没必要进行分解。可以通过 Runtime.getRuntime().availableProcessors() 方法获得当前设备的 CPU 个数。
优先级不同的任务可以使用优先级队列 PriorityBlockingQueue 来处理。它可以让优先级高的任务先执行。
Executor 框架是什么
Java 线程的创建与销毁需要一定的开销,因此为每一个任务创建一个新线程来执行,线程的创建与开销将浪费大量计算资源。而且,如果不对创建线程的数量做限制,可能会导致系统负荷太高而崩溃。
Java 的线程既是工作单元,也是执行机制。JDK1.5 之后,工作单元与执行机制分离,工作单元包括 Runnable 和 Callable,执行机制由 Executor 框架负责。
Executor 框架的两级调度模型
在 HotSpot VM 的线程模型中,Java 线程被一对一映射为本地操作系统线程。
- Java 线程启动时会创建一个本地操作系统线程。
- 当该 Java 线程终止时,这个操作系统线程也会被回收。操作系统会调度所有线程并将它们分配给可用的 CPU。
两级调度模型描述为:
- 在上层,Java 多线程程序通常把应用分解为若干个任务,然后使用用户级的调度器 (Executor 框架) 将这些任务映射为固定数量的线程;
- 在底层,操作系统内核将这些线程映射到硬件处理器上。
Executor 框架组成
Executor 框架主要由 3 大部分组成如下:
- 任务的提交:包括被执行任务需要实现的接口:Runnable 接口或 Callable 接口。
- 任务的执行:包括任务执行机制的核心接口 Executor,以及继承自 Executor 的 ExecutorService 接口。Executor 框架有两个关键类实现了 ExecutorService 接口 (ThreadPoolExecutor 和 ScheduledThreadPoolExecutor)。
- 任务的结果:异步计算的结果,包括接口 Future 和实现 Future 接口的 FutureTask 类。
任务的执行核心类说明
- Executor : 是一个接口,它是 Executor 框架的基础,它将任务的提交与任务的执行分离开
- ExecutorService : 继承 Executor 接口,提供了任务的提交和停止的方法
- AbstractExecutorService : 是 ExecutorService 的抽象实现
- ThreadPoolExecutor : 是 AbstractExecutorService 的核心实现类,用来执行被提交的任务
- ScheduledExecutorService : 继承 ExecutorService 接口,额外提供了可以在给定的延迟后运行命令,或者定期执行命令的 schedule 方法
- ScheduledThreadPoolExecutor : 是 ScheduledExecutorService 的核心实现类,它比 Timer 更灵活,功能更强大
- ForkJoinPool : 支持将一个任务拆分成多个“小任务”并行计算,再把多个“小任务”的结果合并成总的计算结果
任务提交与结果返回核心类说明
- Runnable 接口: 任务方法无返回结果,可以被 ExecutorService 接口执行。
- Callable 接口: 任务方法有返回结果,可以被 ExecutorService 接口执行。
- Future 接口: 实现类代表异步计算的结果。
- FutureTask 代表异步计算的结果
- ScheduledFutureTask 代表异步计算的结果,提供了getDelay任务剩余的延迟时间、isPeriodic任务是否为周期性任务
Executors 工厂方法类说明
Executors 是一个静态的工厂方法类,提供快速创建线程池等操作。
new XX 方法,使用常用的配置设置创建 ExecutorService
- newFixedThreadPool-固定线程数的线程池
- newSingleThreadExecutor-单个线程的线程池
- newCachedThreadPool-根据需要创建新线程的线程池
new XX 方法,使用常用的配置设置创建 ScheduledExecutorService
- newScheduledThreadPool-ScheduledExecutorService
- newSingleThreadScheduledExecutor-单个线程的 ScheduledExecutorService
unconfigurableXX 方法,包装一个不可配置的线程池
- unconfigurableExecutorService
- unconfigurableScheduledExecutorService
各类线程池的创建,创建方法都是Executors 中的静态方法
ThreadPoolExecutor 详解
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
FixedThreadPool 被称为可重用固定线程数的线程池。
参数设置说明
- corePoolSize 和 maximumPoolSize 都被设置为创建 FixedThreadPool 时指定的参数 nThreads。当线程池中的线程数大于 corePoolSize 时,keepAliveTime 为多余的空闲线程等待新任务的最长时间,超过这个时间后多余的线程将被终止。
- keepAliveTime 设置为 0L,意味着多余的空闲线程会被立即终止。
- FixedThreadPool 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。
优缺点
- 优点是多任务并行运行,最大并行运行线程数量是固定的
- 优点是能够保证所有的任务都被执行,永远不会拒绝新的任务
- 缺点是队列数量没有限制,在任务执行时间无限延长的这种极端情况下会造成内存问题
SingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
SingleThreadExecutor 是使用单个线程的线程池。
参数设置及影响说明
- corePoolSize 和 maximumPoolSize 被设置为 1。其他参数与 FixedThreadPool 相同。
- 使用无界队列 LinkedBlockingQueue 作为线程池的工作队列(队列的容量为 Integer.MAX_VALUE)。
优缺点
- 优点是适用于在逻辑上需要单线程处理任务的场景,最大并行运行线程数量固定为1
- 优点是能够保证所有的任务都被执行,永远不会拒绝新的任务
- 缺点是队列数量没有限制,在任务执行时间无限延长的这种极端情况下会造成内存问题
CachedThreadPool
CachedThreadPool 是一个会根据需要创建新线程的线程池。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
参数设置及影响说明
- corePoolSize 被设置为 0,即 核心线程
- maximumPoolSize 被设置为 Integer.MAX_VALUE,即 maximumPool 是无界的。
- keepAliveTime 设置为 60L,意味着 CachedThreadPool 中的空闲线程等待新任务的最长时间为 60 秒,空闲线程超过 60 秒后将会被 终止。
- SynchronousQueue 是一个没有容量的阻塞队列。每个插入操作必须等待另一个线程的对应移除操作,反之亦然。
- CachedThreadPool 使用 SynchronousQueue,把主线程提交的任务传递给空闲线程执行。
优缺点
- 优点是多任务并行运行,最大并行运行线程数量是不固定的,随着新任务到达可持续创建新的线程
- 优点是能够保证所有的任务都被执行,永远不会拒绝新的任务
- 缺点是极端情况下,处理速度小于任务提交速度时,会因为创建过多线程而耗尽 CPU 和内存资源
ScheduledThreadPoolExecutor 详解
ScheduledThreadPoolExecutor 继承自 ThreadPoolExecutor。它主要用来在给定的延迟之后运行任务,或者定期执行任务。
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE,
0, NANOSECONDS,
new DelayedWorkQueue());
}
DelayQueue 是一个无界队列,所以 maximumPoolSize 在 ScheduledThreadPoolExecutor 中没有什么意义(设置 maximumPoolSize 的大小没有什么效果)。
ScheduledThreadPoolExecutor 的执行主要分为两大部分:
- 当调用 scheduleAtFixedRate 方法或者 scheduleWithFixedDelay 方法时,会向 DelayQueue 添加一个实现了 RunnableScheduledFuture 接口的 ScheduledFutureTask。
- 线程池中的线程从 DelayQueue 中获取 ScheduledFutureTask,然后执行任务。
ScheduledThreadPoolExecutor 如何获取执行任务
- long 型成员变量 time,表示这个任务将要被执行的具体时间。
- long 型成员变量 sequenceNumber,表示这个任务被添加到 ScheduledThreadPoolExecutor 中的序号。
- long 型成员变量 period,表示任务执行的间隔周期。
DelayQueue 封装了一个 PriorityQueue,这个 PriorityQueue 会对队列中的 ScheduledFutureTask 进行排序。
排序时,time 小的排在前面(时间早的任务将被先执行)。
如果两个 ScheduledFutureTask 的 time 相同,就比较 sequenceNumber,sequenceNumber 小的排在前面(也就是说,如果两个任务的执行时间相同,那么先提交的任务将被先执行)
ScheduledThreadPoolExecutor 与 ThreadPoolExecutor 比较
ScheduledThreadPoolExecutor 为了实现周期性的执行任务,对 ThreadPoolExecutor 做了如下的修改。
- 使用 DelayQueue 作为任务队列。
- 获取任务的方式不同。
- 执行周期任务后,增加了额外的处理。
FutureTask 详解
FutureTask 的状态转换
private static final int NEW = 0;//新建
private static final int COMPLETING = 1;//完成
private static final int NORMAL = 2;//正常
private static final int EXCEPTIONAL = 3;//异常
private static final int CANCELLED = 4;//取消
private static final int INTERRUPTING = 5;//中断中
private static final int INTERRUPTED = 6;//中断
可能的状态转换:
- 新建-> 完成-> 正常
- 新建-> 完成-> 异常
- 新建-> 取消
- 新建-> 中断中-> 中断
FutureTask 的实现
FutureTask.get 方法执行过程
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
// 计算等待截止时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
// 1. 判断阻塞线程是否被中断,如果被中断则在等待队
// 列中删除该节点并抛出 InterruptedException 异常
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}
// 2. 获取当前状态,如果状态大于 COMPLETING
// 说明任务已经结束 (要么正常结束,要么异常结束,要么被取消)
// 则把 thread 显示置空,并返回结果
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
// 3. 如果状态处于中间状态 COMPLETING
// 表示任务已经结束但是任务执行线程还没来得及给 outcome 赋值
// 这个时候让出执行权让其他线程优先执行
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
// 4. 如果等待节点为空,则构造一个等待节点
else if (q == null)
q = new WaitNode();
// 5. 如果还没有入队列,则把当前节点加入 waiters 首节点并替换原来 waiters
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q);
else if (timed) {
// 如果需要等待特定时间,则先计算要等待的时间
// 如果已经超时,则删除对应节点并返回对应的状态
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
// 6. 阻塞等待特定时间
LockSupport.parkNanos(this, nanos);
}
else
// 6. 阻塞等待直到被其他线程唤醒
LockSupport.park(this);
}
}
get 方法源码解析:
- 判断任务当前的 state <= COMPLETING 是否成立。前面分析过,COMPLETING 状态是任务是否执行完成的临界状态。
- 如果成立,表明任务还没有结束 (这里的结束包括任务正常执行完毕,任务执行异常,任务被取消),则会调用 awaitDone 进行阻塞等待。
- 如果不成立表明任务已经结束,调用 report 返回结果。
awaitDone 方法源码解析:
awaitDone 中有个死循环,每一次循环都会:
- 判断调用 get 的线程是否被其他线程中断,如果是的话则在等待队列中删除对应节点然后抛出 InterruptedException 异常。
- 获取任务当前状态,如果当前任务状态大于 COMPLETING 则表示任务执行完成,则把 thread 字段置 null 并返回结果。
- 如果任务处于 COMPLETING 状态,则表示任务已经处理完成 (正常执行完成或者执行出现异常),但是执行结果或者异常原因还没有保存到 outcome 字段中。这个时候调用线程让出执行权让其他线程优先执行。
- 如果等待节点为空,则构造一个等待节点 WaitNode。
- 如果第四步中新建的节点还没如队列,则 CAS 的把该节点加入 waiters 队列的首节点。
- 阻塞等待。
假设当前 state=NEW 且 waiters 为 NULL,也就是说还没有任何一个线程调用 get 获取执行结果,这个时候有两个线程 threadA 和 threadB 先后调用 get 来获取执行结果。再假设这两个线程在加入阻塞队列进行阻塞等待之前任务都没有执行完成且 threadA 和 threadB 都没有被中断的情况下 (因为如果 threadA 和 threadB 在进行阻塞等待结果之前任务就执行完成或线程本身被中断的话,awaitDone 就执行结束返回了),执行过程是这样的,以 threadA 为例:
- 第一轮 for 循环,执行的逻辑是 q == null,所以这时候会新建一个节点 q。第一轮循环结束。
- 第二轮 for 循环,执行的逻辑是!queue,这个时候会把第一轮循环中生成的节点的 next 指针指向 waiters,然后 CAS 的把节点 q 替换 waiters。也就是把新生成的节点添加到 waiters 链表的首节点。如果替换成功,queued=true。第二轮循环结束。
- 第三轮 for 循环,进行阻塞等待。要么阻塞特定时间,要么一直阻塞知道被其他线程唤醒。
FutureTask.run 方法执行过程
public void run() {
// 1. 状态如果不是 NEW,说明任务或者已经执行过,或者已经被取消,直接返回
// 2. 状态如果是 NEW,则尝试把当前执行线程保存在 runner 字段中
// 如果赋值失败则直接返回
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 3. 执行任务
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 4. 任务异常
setException(ex);
}
if (ran)
// 4. 任务正常执行完毕
set(result);
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
// 5. 如果任务被中断,执行中断处理
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
- 判断当前任务的 state 是否等于 NEW,如果不为 NEW 则说明任务或者已经执行过,或者已经被取消,直接返回。
- 如果状态为 NEW 则接着会通过 unsafe 类把任务执行线程引用 CAS 的保存在 runner 字段中,如果保存失败,则直接返回。
- 执行任务。
- 如果任务执行发生异常,则调用 setException 方法保存异常信息
三、Lock
为什么要设计 Lock 接口
锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时访问共享资源 (但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。
在 Lock 接口出现之前,Java 程序是靠 synchronized 关键字实现锁功能的,而 Java SE 5 之后,并发包中新增了 Lock 接口 (以及相关实现类) 用来实现锁功能,它提供了与 synchronized 关键字类似的同步功能, 只是在使用时需要显式地获取和释放锁。虽然它缺少了 (通过 synchronized 块或者方法所提供的) 隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以及超时获取锁等多种 synchronized 关键字所不具备的同步特性。
使用 synchronized 关键字将会隐式地获取锁,但是它将锁的获取和释放固化了,也就是先获取再释放。当然,这种方式简化了同步的管理,可是扩展性没有显示的锁获取和释放来的好。
例如,针对一个场景,手把手进行锁获取和释放,先获得锁 A,然后再获取锁 B,当锁 B 获得后,释放锁 A 同时获取锁 C,当锁 C 获得后,再释放 B 同时获取锁 D,以此类推。这种场景下,synchronized 关键字就不那么容易实现了,而使用 Lock 却容易许多。
如何使用 Lock
- 不要将获取锁的过程写在 try 块中,因为如果在获取锁 (自定义锁的实现) 时发生了异常,异常抛出的同时,也会导致锁无故释放。
- 在 finally 块中释放锁,目的是保证在获取到锁之后,最终能够被释放。
正确的使用方式:
Lock lock = new ReentrantLock();
lock.lock();
try {
// 业务逻辑处理
} finally {
lock.unlock();
}
Lock 的 API 说明
public interface Lock {
// 阻塞地获取锁,直到获取到锁才返回,而且是不可中断的。
void lock();
// 可中断地获取锁,与 lock() 方法的不同之处,在于该方法在阻塞等待锁的过程中会响应中断。
void lockInterruptibly() throws InterruptedException;
// 尝试非阻塞地获取锁,即调用该方法后,立刻返回,成功获取锁,返回 true,失败则返回 false。
boolean tryLock();
// 可中断的超时获取锁,在以下 3 种情况下会返回:
// 1. 当前线程在指定时间内获得了锁;
// 2. 当前线程在指定时间内被中断;
// 3. 指定时间结束(超时结束),返回 false;
boolean tryLock(long time, TimeUnit unit) throws InterruptedException;
// 释放锁
void unlock();
// 等待通知组件,当前线程只有获得了锁,才能调用该组件的 wait() 方法,调用后,线程将会释放锁
Condition newCondition();
}
Lock 接口的实现及依赖
图例:
- 红色线表示内部类
- 绿色虚线表示接口实现关系
- 蓝色实线表示类继承关系
Lock 接口的实现类主要有:
- 重入锁(ReentrantLock)
- 读锁(ReadLock)
- 写锁(WriteLock)
Lock 接口的实现基本都是通过聚合了一个同步器(AbstractQueuedSynchronizer 缩写为 AQS)的子类来完成线程访问控制的。
AbstractQueuedSynchronizer介绍
队列同步器介绍
队列同步器 AbstractQueuedSynchronizer(以下简称同步器或者 AQS),是用来构建锁或者其他同步组件的基础框架,它使用了一个 int 成员变量表示同步状态,通过内置的 FIFO 队列来完成资源获取线程的排队工作,并发包的作者 (Doug Lea) 期望它能够成为实现大部分同步需求的基础。
如何使用?
同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的 3 个方法 (getState()、setState(int newState) 和 compareAndSetState(int expect,int update)) 来进行操作,因为它们能够保证状态的改变是安全的。
子类推荐被定义为自定义同步组件的静态内部类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来供自定义同步组件使用,
同步器既可以支持独占式地获取同步状态,也可以支持共享式地获取同步状态,这样就可以方便实现不同类型的同步组件 (ReentrantLock、 ReentrantReadWriteLock 和 CountDownLatch 等)。
同步器与锁的关系
同步器是实现锁 (也可以是任意同步组件) 的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。可以这样理解二者之间的关系:
- 锁是面向使用者的,它定义了使用者与锁交互的接口 (比如可以允许两个线程并行访问),隐藏了实现细节;
- 同步器面向的是锁的实现者,它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。锁和同步器很好地隔离了使用者和实现者所需关注的领域。
同步器方法说明
操作同步状态的方法
// 获取当前同步状态
protected final int getState()
// 设置当前同步状态
protected final void setState(int newState)
// 使用 CAS 设置当前状态,该方法能够保证状态设置的原子性
protected final boolean compareAndSetState(int expect, int update)
继承时可重写的方法
// 独占式获取状态。实现需要查询当前状态是否符合预期,然后再使用 CAS 设置同步状态
protected boolean tryAcquire(int arg)
// 独占式释放状态,等待获取同步状态的线程将有机会获取同步状态
protected boolean tryRelease(int arg)
// 共享式获取状态,返回≥0 的值表示成功,反之获取失败
protected int tryAcquireShared(int arg)
// 共享式释放同步状态
protected boolean tryReleaseShared(int arg)
// 在独占式模式下判断是否被线程占用
protected boolean isHeldExclusively()
默认提供的模板方法
同步器提供的模板方法基本上分为 3 类:
- 独占式同步状态获取与释放
- 共享式同步状态获取与释放
- 同步状态和查询同步队列中的等待线程情况
自定义同步组件将使用同步器提供的模板方法来实现自己的同步语义。以下为 AQS 模板方法:
// ---------------------- 独占式相关操作 ----------------------
// 独占模式获取,忽略中断。调用一次重写的 tryAcquire 方法,在成功时返回。
// 否则,线程进入同步队列等待,直到调用 tryAcquire 成功为止。
// 这种方法可以用来实现方法 Lock.lock
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
// 与 acquire(int arg) 相同,但是该方法响应中断
public final void acquireInterruptibly(int arg) throws InterruptedException...
// 与 acquireInterruptibly(int arg) 相同,支持超时返回
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException...
// 独占式的释放同步状态,该方法会在释放同步状态后将同步队列中的第一个节点包含的线程唤醒
public final boolean release(int arg)
// ---------------------- 共享式相关操作 ----------------------
// 共享式的获取同步状态,与独占式获取的主要区别是同一时刻可以有多个线程获取同步状态
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
// 与 acquireShared(int arg) 相同,但是该方法响应中断
public final void acquireSharedInterruptibly(int arg) throws InterruptedException
// 与 acquireSharedInterruptibly(int arg) 相同,支持超时返回
public final boolean tryAcquireNanos(int arg, long nanosTimeout) throws InterruptedException
// 共享式的释放同步状态
public final boolean releaseShared(int arg)
基于同步器的独占式锁的实现示例
参考同步器方法说明中的内容我们定义一个独占式锁,定义一个 Sync 继承于 AQS。所有与锁相关的语义交于 Sync 完成。
public class MutexLockExample implements Lock {
// 静态内部类,自定义同步器,重写 AQS 的方法
private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于占用状态
@Override
protected boolean isHeldExclusively() {
return getState() == 1;
}
// 当状态为 0 的时候获取锁
@Override
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
// 释放锁,将状态设置为 0
@Override
protected boolean tryRelease(int releases) {
if (getState() == 0) throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
// 返回一个 Condition,每个 condition 都包含了一个 condition 队列
protected Condition newCondition() {
return new ConditionObject();
}
}
// 将 Lock 方法的实现代理到 Sync 实现
private final Sync sync = new Sync();
@Override
public void lock() { sync.acquire(1); }
@Override
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
@Override
public boolean tryLock() { return sync.tryAcquire(1); }
@Override
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public boolean isLocked() { return sync.isHeldExclusively(); }
@Override
public void unlock() { sync.release(1); }
@Override
public Condition newCondition() { return sync.newCondition(); }
}
同步器如何维护线程状态?
同步器的实现依赖于一个 FIFO 队列,那么队列中的元素 Node 就是保存着线程引用和线程状态的容器,每个线程对同步器的访问,都可以看做是队列中的一个节点(Node)。
节点是构成同步队列的基础,同步器拥有首节点 (head) 和尾节点 (tail),没有成功获取同步状态的线程将会成为节点加入该队列的尾部。首节点的线程在释放同步状态时,将会唤醒后继节点,而后继节点将会在获取同步状态成功时将自己设置为首节点。
节点(Node)介绍
AbstractQueuedSynchronizer#Node 的主要包含以下成员变量:
static final class Node {
/**
* 表示节点的状态。其中包含的状态有:
* CANCELLED,值为 1,表示当前的线程被取消;
* SIGNAL, 值为-1,表示当前节点的后继节点包含的线程需要运行,也就是 unpark;
* CONDITION,值为-2,表示当前节点在等待 condition,也就是在 condition 队列中;
* PROPAGATE,值为-3,表示当前场景下后续的 acquireShared 能够得以执行;
* 值为 0,不是以上状态时(新节点入队时的默认状态)
*/
volatile int waitStatus;
// 前驱节点,比如当前节点被取消,那就需要前驱节点和后继节点来完成连接。
volatile Node prev;
// 后继节点。
volatile Node next;
// 入队列时的当前线程。
volatile Thread thread;
// 存储 condition 队列中的后继节点。
Node nextWaiter;
}
首节点尾节点设置
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer
implements java.io.Serializable {
// 设置首节点方法
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
// 设置尾节点方法
private Node enq(Node node) {
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) { // CAS 操作
oldTail.next = node;
return oldTail;
}
} else {
initializeSyncQueue();
}
}
}
}
设置首节点方法说明
通过获取同步状态成功的线程来完成,由于只有一个线程能够成功获取到同步状态。
因此设置头节点的方法并不需要使用 CAS 来保证,它只需要将首节点设置成为原首节点的后继节点并断开原首节点的 next 引用即可
设置尾节点方法说明
由于线程无法获取到同步状态时,转而被构造成节点并加入到同步队列,可能存在多个线程一起设置为尾节点的动作,必须保证线程安全。
因此内部通过方法 compareAndSetTail 设置,它需要传递当前线程“认为”的尾节点和当前节点,只有设置成功后,当前节点才正式与之前的尾节点建立关联。
AbstractQueuedSynchronizer原理
独占式同步状态获取与释放
独占式同步状态获取
通过调用同步器的 acquire 方法可以获取同步状态
public final void acquire(int arg) {
if (!tryAcquire(arg) && // 尝试获取同步状态(自定义重写该方法)
acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) // 获取同步失败时,构造同步节点加入队列
selfInterrupt();
// 独占式 Node.EXCLUSIVE : 同一时刻只能有一个线程成功获取同步状态
}
// 将节点添加至同步队列尾部
private Node addWaiter(Node mode) {
Node node = new Node(mode);
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) { // CAS 方法更新到队列尾部
oldTail.next = node;
return node;
}
} else {
initializeSyncQueue(); // 初始化同步队列
}
}
}
final boolean acquireQueued(final Node node, int arg) {
boolean interrupted = false;
try {
for (;;) {
final Node p = node.predecessor(); // 获取当前节点的前一节点 p
if (p == head && tryAcquire(arg)) { // 如果是头节点再次尝试获取同步状态
setHead(node);
p.next = null; // help GC
return interrupted;
}
// 判断是否可以进入等待状态(前一节点 waitStatus=Node.SIGNAL 时)
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt(); // 等待过程是否被中断
}
} catch (Throwable t) { // 异常情况下取消获取并做中断处理
cancelAcquire(node);
if (interrupted)
selfInterrupt();
throw t;
}
}
- 首先调用自定义同步器实现的 tryAcquire 方法,该方法保证线程安全的获取同步状态
- 如果同步状态获取失败,则构造同步节点并通过 addWaiter 方法将该节点加入到同步队列的尾部
- 最后调用 acquireQueued 方法,使得该节点以“死循环”的方式获取同步状态。
- 如果获取不到,判断自己是否要进入等待状态,进入等待状态后唤醒主要依靠前驱节点的出队或阻塞线程被中断来实现。
- 如果获取到,将自己设置为首节点
独占式同步状态释放
释放同步状态,使得后续节点能 够继续获取同步状态。通过调用同步器的 release 方法可以释放同步状态,该方法在释放了同步状态之后,会唤醒其后驱节点 (使后驱节点重新尝试获取同步状态)
public final boolean release(int arg) {
if (tryRelease(arg)) { // 自定义释放逻辑
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h); // 唤醒后续节点
return true;
}
return false;
}
private void unparkSuccessor(Node node) {
int ws = node.waitStatus;
if (ws < 0) // 如果状态小于 0 ,尝试修改为 0
node.compareAndSetWaitStatus(ws, 0);
// 获取当前释放节点的后驱节点
// 如果后驱节点为空或者等待状态>0 时,表示已被取消
// 从后向前遍历寻找有效的节点进行唤醒。此时,再和 acquireQueued 方法联系起来
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node p = tail; p != node && p != null; p = p.prev)
if (p.waitStatus <= 0)
s = p;
}
if (s != null)
LockSupport.unpark(s.thread); // 唤醒
}
- 首先调用自定义同步器实现的 tryRelease 方法进行释放操作
- 如果释放成功尝试唤醒后续节点,唤醒逻辑为:
- 当前头节点状态重置为 0
- 依次循环寻找一个有效的节点进行唤醒
共享式同步状态获取与释放
共享式获取与独占式获取最主要的区别在于同一时刻能否有多个线程同时获取到同步状态(可重入锁)。
共享式同步状态获取
相对于独占的锁的 tryAcquire 返回 boolean 类型的值,共享锁的 tryAcquireShared 返回的是一个整型值:
- 如果该值小于 0,则代表当前线程获取共享锁失败
- 如果该值大于 0,则代表当前线程获取共享锁成功,并且接下来其他线程尝试获取共享锁的行为很可能成功
- 如果该值等于 0,则代表当前线程获取共享锁成功,但是接下来其他线程尝试获取共享锁的行为会失败
因此,只要该返回值大于等于 0,就表示获取共享锁成功。
public final void acquireShared(int arg) {
if (tryAcquireShared(arg) < 0)
doAcquireShared(arg);
}
private void doAcquireShared(int arg) {
final Node node = addWaiter(Node.SHARED); // 构造一个共享式节点加入等待队列
boolean interrupted = false;
try {
// 共享式获取的自旋过程中
// 成功获取到同步状态并退出自旋的条件就是 tryAcquireShared 方法返回值大于等于 0
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) { // 返回值大于等于 0 时,表示能够获取到同步状态
setHeadAndPropagate(node, r); // 设置头节点并尝试唤醒后驱节点(重点)
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node))
interrupted |= parkAndCheckInterrupt();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
} finally {
if (interrupted)
selfInterrupt();
}
}
独占与共享获取同步状态主要差异:
独占锁的 acquireQueued 调用的是 addWaiter(Node.EXCLUSIVE),而共享锁调用的是 addWaiter(Node.SHARED)
获取锁成功后的行为,对于独占锁而言,是直接调用了 setHead 方法,而共享锁调用的是 setHeadAndPropagate
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 记录现在的头节点,后面 if 中重新赋值(处理多线程同时设置头节点情况)
setHead(node);
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 在共享锁模式下,锁可以被多个线程所共同持有,既然当前线程已经拿到共享锁了,
// 那么就可以直接通知后驱节点来拿锁,而不必等待锁被释放的时候再通知。
if (s == null || s.isShared())
doReleaseShared(); // 该方法参考《共享式同步状态释放》中解释
}
}
共享式同步状态释放
共享式释放同步状态之后,将会唤醒后续处于等待状态的节点。
它和独占式主要区别在于 tryReleaseShared 方法必须确保同步状态线程安全释放,一般是通过循环和 CAS 来保证的,因为释放同步状态的操作会同时来自多个线程。
在共享锁模式下,头节点就是持有共享锁的节点,在它释放共享锁后,也应该唤醒它的后驱节点。唤醒方法 doReleaseShared 可能会同一个头节点调用 2 次:
- setHeadAndPropagate 方法中调用
- releaseShared 方法中调用,当前的头节点可能易主了
为什么要调用 2 次?
- 假设有 A/B/C 三个线程依次获取锁,谁拿到锁后同时设置为头节点,所以头节点可能依次为 A/B/C。
- 在多线程共享模式下,A/B 可能同时获取锁。A 拿到锁后,头节点为 A ,此时 B 重入进来拿到锁(A 此时还没释放)本应该头节点为 A 时确成了 B。这时头节点易主了。
- 因此在共享模式下获取锁后设置头节点后,针对当前头节点执行一次唤醒方法。
- 因此在释放过程中会对头节点进行判断,如果头节点被修改了就继续循环判断,执行唤醒方法。
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) { // 注意这里说明了队列至少有两个节点(2 个说明有一个可能会被唤醒)
int ws = h.waitStatus;
if (ws == Node.SIGNAL) { // 头节点的后驱节点需要被唤醒
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // CAS 修改头节点状态为 0
unparkSuccessor(h); // 修改成功时进行唤醒操作
}
// 如果头节点状态已经是 0 时,尝试修改状态为 PROPAGATE
// 如果尝试修改失败时说明说明有新的节点入队了,ws 的值被改为了 Node.SIGNAL
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue;
}
if (h == head) // 如果 head 没有被修改跳出循环
break;
}
}
超时获取同步状态
同步器支持以超时获取同步状态,即在指定的时间段内获取同步状态,如果获取到同步状态则返回 true,否则,返回 false。该方法提供了传统 Java 同步操作 (比如 synchronized 关键字) 所不具备的特性。
支持超时获取方法如下:
// 独占式超时获取同步状态
public final boolean tryAcquireNanos(long arg, long nanosTimeout)
throws InterruptedException
// 共享式超时获取同步状态
public final boolean tryAcquireSharedNanos(long arg, long nanosTimeout)
throws InterruptedException
响应中断的同步状态获取过程
- 在 Java 5 之前,当一 个线程获取不到锁而被阻塞在 synchronized 之外时,对该线程进行中断操作,此时该线程的中断标志位会被修改,但线程依旧会阻塞在 synchronized 上,等待着获取锁。
- 在 Java 5 中,同步器 提供了 acquireInterruptibly(int arg) 方法,这个方法在等待获取同步状态时,如果当前线程被中断,会立刻返回,并抛出 InterruptedException。
以独占式超时获取同步方法说明
针对超时获取,独占式获取同步状态 acquire 在流程上非常相似,其主要区别在于未获取到同步状态时的处理逻辑,在主要需要计算出需要睡眠的时间间隔 nanosTimeout,为了防止过早通知。
处理逻辑主要为:
- nanosTimeout 超时时间
- 超时截止时间 deadline = System.nanoTime() + nanosTimeout;
- for 中每次重新计算超时时间 nanosTimeout = deadline - System.nanoTime();
- 如果当前超时时间 < 1000 时一直自旋,不进入睡眠,否则睡眠超时时间
private boolean doAcquireNanos(long arg, long nanosTimeout)
throws InterruptedException {
if (nanosTimeout <= 0L)
return false;
final long deadline = System.nanoTime() + nanosTimeout; // 超时时间戳
final Node node = addWaiter(Node.EXCLUSIVE);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return true;
}
// 计算需要等待时间(nanoseconds)
nanosTimeout = deadline - System.nanoTime();
if (nanosTimeout <= 0L) { // 等待时间 <= 0 表示已经超时,返回 false
cancelAcquire(node);
return false;
}
// 如果超时时间 > 1000L/nanoseconds 不进行睡眠一直循环
if (shouldParkAfterFailedAcquire(p, node) &&
nanosTimeout > SPIN_FOR_TIMEOUT_THRESHOLD)
LockSupport.parkNanos(this, nanosTimeout); // 睡眠计算出的等待时间
if (Thread.interrupted())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
重入锁-ReentrantLock
重入锁 ReentrantLock,就是支持重进入的锁,它表示该锁能够支持一个线程对资源的重复加锁。除此之外,该锁的还支持获取锁时的公平和非公平性选择。
ReentrantLock 虽然没能像 synchronized 关键字一样支持隐式的重进入,但是在调用 lock 方法时,已经获取到锁的线程,能够再次调用 lock 方法获取锁而不被阻塞。
公平和非公平选择
这里提到一个锁获取的公平性问题,如果在绝对时间上,先对锁进行获取的请求一定先被满足,那么这个锁是公平的,反之,是不公平的。
公平的获取锁,也就是等待时间最长的线程最优先获取锁,也可以说锁获取是顺序的。
ReentrantLock 提供了一个构造函数,能够控制锁是否是公平的。
重入机制需要解决的问题
重进入是指任意线程在获取到锁之后能够再次获取该锁而不会被锁所阻塞,该特性的实现需要解决以下两个问题:
- 线程再次获取锁。锁需要去识别获取锁的线程是否为当前占据锁的线程,如果是,则再次成功获取。
- 锁的最终释放。线程重复 n 次获取了锁,随后在第 n 次释放该锁后,其他线程能够获取到该锁。
在代码层面,锁的最终释放要求锁:
- 锁被获取是,计数自增,计数表示当前锁被重复获取的次数;
- 锁被释放时,计数自减,当计数等于 0 时表示锁已经成功释放。
如何使用?
以下代码演示一个非公平锁对 value 的并发操作
@Slf4j
public class ReentrantLockExample {
int value = 0;
private final ReentrantLock lock = new ReentrantLock(false);
public void writer() {
lock.lock(); // 获取锁
try {
//value++;
lock.lock(); // 再次获取锁(重入)
try {
value++;
log.info("当前线程持有锁数 = {}", lock.getHoldCount()); // 2
} finally {
lock.unlock(); // 释放锁
}
} finally {
lock.unlock(); // 再次释放锁
}
}
}
实现机制分析
因为 ReentrantLock 支持公平与非公平选择,内部实现机制为:
- 内部基于 AbstractQueuedSynchronizer(AQS)实现一个公平与非公平公共的父类 Sync ,用于管理同步状态
- FairSync 继承 Sync 用于处理公平问题
- NonfairSync 继承 Sync 用于处理非公平问题
公平和非公平实现主要的区别在于公平锁加锁时调用 hasQueuedPredecessors 方法取出等待队列的前驱节点。
优缺点
事实上,公平的锁机制往往没有非公平的效率高,但是,并不是任何场景都是以 TPS 作为唯一的指标。
- 公平锁能够减少“饥饿”发生的概率,等待越久的请求越是能够得到优先满足。
- 非公平性锁虽然可能造成线程“饥饿”,但极少的线程切换,保证了其更大的吞吐量。
-
此处直接简要对比 synchronized 、ReentrantLock 即可分析使用场景。
实际代码逻辑中是否需要中断、状态感知、锁独特的功能判断。
锁中断操作
- synchronized:不支持中断操作
- ReentrantLock:支持中断,支持超时中断
锁功能性
- synchronized:独占锁、可重入锁
- ReentrantLock:可重入锁、公平非公平选择
锁状态感知
- synchronized:无法判断是否拿到锁
- ReentrantLock:可以判断是否拿到锁
读写锁(ReentrantReadWriteLock)
锁 (如 Mutex 和 ReentrantLock、synchronized) 基本都是排他锁。 读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读线程和其他写线程均被阻塞。 读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写锁,使得并发性相比一般的排他锁有了很大提升。
不用读写锁怎么实现读写交互
在没有读写锁支持的 (Java 5 之前) 时候,如果需要完成上述工作就要使用 Java 的等待通知机制。
当写操作开始时,所有晚于写操作的读操作均会进入等待状态,只有写操作完成并进行通知之后,所有等待的读操作才能继续执行 (写操作之间依靠 synchronized 关键进行同步),这样做的目的是使读操作能读取到正确的数据,不会出现脏读。
读写锁-特性
- 公平性:支持公平非公平选择,默认非公平
- 重入性:读线程获取读锁后可以再次获取读锁。写线程获取写锁后可以再次获取写锁和读锁
- 锁降级:遵循获取写锁-> 获取读锁-> 释放写锁的次序,最终写锁降级为读锁
- 写锁是一个支持重进入的排它锁
- 读锁是一个支持重进入的共享锁
读写锁-使用示例
@Slf4j
public class ReentrantReadWriteLockExample {
private final Map<String, String> cache = new HashMap<>(100); // 内存数据
private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock readLock = lock.readLock(); // 读锁
private final Lock writeLock = lock.writeLock(); // 写锁
public String get(String key) { // 读操作
readLock.lock();
try {
return cache.get(key);
} finally {
readLock.unlock();
}
}
public void put(String key, String value) { // 写操作
writeLock.lock();
try {
cache.put(key, value);
} finally {
writeLock.unlock();
}
}
// ReentrantReadWriteLock 提供锁状态相关方法示例
public void logLockStatus() {
// e.g. 一个线程重入了 n 次读锁,那么 getReadHoldCount = 1,getReadLockCount = n
log.info("当前读锁被获取的次数 = {}", lock.getReadLockCount());
log.info("当前线程获取读锁的次数 = {}", lock.getReadHoldCount());
log.info("写锁是否被获取 = {}", lock.isWriteLocked());
log.info("写锁被获取的次数 = {}", lock.getWriteHoldCount());
}
}
读写锁实现分析
因为读写锁支持公平与非公平选择,内部实现机制为:
- 内部基于 AbstractQueuedSynchronizer(AQS)实现一个公平与非公平公共的父类 Sync ,用于管理同步状态
- FairSync 继承 Sync 用于处理公平问题
- NonfairSync 继承 Sync 用于处理非公平问题
- ReadLock 实现 Lock 接口,内部聚合 Sync
- WriteLock 实现 Lock 接口,内部聚合 Sync
读写锁的构造函数如下:
public ReentrantReadWriteLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
readerLock = new ReadLock(this);
writerLock = new WriteLock(this);
}
分析 ReentrantReadWriteLock 的实现
- 同步状态字段设计
- 写锁的获取与释放
- 读锁的获取与释放
- 锁降级
同步状态字段设计
读写锁的自定义同步器需要在同步状态 (一个整型变量) 上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。
读写锁对于同步状态的实现是在一个整形变量上通过“按位切割使用”:将变量切割成两部分,
- 高 16 位表示读
- 低 16 位表示写。
假设当前同步状态值为 S,get 和 set 的操作如下:
- 获取写状态:S&0x0000FFFF,将高 16 位全部抹去
- 获取读状态:S>>>16,无符号补 0,右移 16 位
- 写状态加 1:S + 1
- 读状态加 1:S +(1<<16)即 S + 0x00010000
在代码层的判断中,S 不等于 0 时,当写状态 (S&0x0000FFFF) 等于 0 时,则读 状态 (S>>>16) 大于 0,即读锁已被获取。
写锁的获取与释放
写锁是一个支持重进入的排它锁。
- 重入性:如果当前线程已经获取了写锁,则增加写状态。
- 排他性:如果当前线程在获取写锁时,读锁已经被获取 (读状态不为 0) 或者该线程不是已经获取写锁的线程,则当前线程进入等待状态。
写锁的获取
分析 ReentrantReadWriteLock.Sync#tryAcquire 方法:
protected final boolean tryAcquire(int acquires) {
Thread current = Thread.currentThread();
int c = getState();
int w = exclusiveCount(c); // 根据同步状态获取写锁的个数
if (c != 0) { // 如果当前有写锁或者读锁
// 如果写锁为 0 或者当前线程不是独占线程(不符合重入),返回 false
if (w == 0 || current != getExclusiveOwnerThread())
return false;
if (w + exclusiveCount(acquires) > MAX_COUNT) // 写锁的个数超了最大值,抛出异常
throw new Error("Maximum lock count exceeded");
setState(c + acquires); // 重入获取
return true;
}
// 如果当前没有写锁或者读锁,或者写线程应该阻塞或者 CAS 失败,返回 false
if (writerShouldBlock() || !compareAndSetState(c, c + acquires))
return false;
setExclusiveOwnerThread(current); // 否则将当前线程置为获得写锁的线程
return true;
}
写锁的释放
写锁的释放与 ReentrantLock 的释放过程基本类似,每次释放均减少写状态,当写状态为 0 时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对后续读写线程可见。
读锁的获取与释放
读锁是一个支持重进入的共享锁,它能够被多个线程同时获取,在没有其他写线程访问 (或者写状态为 0) 时,读锁总会被成功地获取,而所做的也只是 (线程安全的) 增加读状态。
- 重入性:如果当前线程已经获取了读锁,则增加读状态。
- 与写锁互斥性:如果当前线程在获取读锁时,写锁已被其他线程获取 (写锁被自己获取时可以进行锁降级),则进入等待状态。
读锁的获取
分析 ReentrantReadWriteLock.Sync#tryAcquireShared 方法:
protected final int tryAcquireShared(int unused) {
Thread current = Thread.currentThread();
int c = getState();
// 一.如果当前有写线程并且本线程不是写线程,不符合重入
if (exclusiveCount(c) != 0 &&
getExclusiveOwnerThread() != current)
return -1;
int r = sharedCount(c); // 获取读锁数量
// 二.如果读不应该阻塞并且读锁的个数小于最大值 MAX_COUNT,并且可以成功更新状态值,成功
if (!readerShouldBlock() &&
r < MAX_COUNT &&
compareAndSetState(c, c + SHARED_UNIT)) {
if (r == 0) { // 如果当前读锁为 0
firstReader = current; // 第一个读线程就是当前线程
firstReaderHoldCount = 1;
} else if (firstReader == current) { // 如果当前线程重入了,firstReaderHoldCount++
firstReaderHoldCount++;
} else { // 当前读线程和第一个读线程不同,记录每一个线程读的次数
ReentrantReadWriteLock.Sync.HoldCounter rh = cachedHoldCounter;
if (rh == null ||
rh.tid != LockSupport.getThreadId(current))
cachedHoldCounter = rh = readHolds.get();
else if (rh.count == 0)
readHolds.set(rh);
rh.count++;
}
return 1;
}
return fullTryAcquireShared(current); // 三.第二步不满足时,循环尝试
}
读锁的释放
读锁的每次释放 (线程安全的,可能有多个读线程同时释放读锁) 均减少读状态,减少的值是 (1<<16)。
锁降级
锁降级指的是写锁降级成为读锁。锁降级是指把持住 (当前拥有的) 写锁,再获取到读锁,随后释放 (先前拥有的) 写锁的过程。
锁降级有什么用?
1、保证数据的可见性:
- 如果当前线程不获取读锁而是直接释放写锁,假设此刻另一个线程 (记作线程 T) 获取了写锁并修改了数据,那么当前线程无法感知线程 T 的数据更新。
- 如果当前线程获取读锁,即遵循锁降级的步骤,则线程 T 将会被阻塞,直到当前线程使用数据并释放读锁之后,线程 T 才能获取写锁进行数据更新。
2、提高性能:
- 存在一个事务性的写操作,分 N 段完成,比较耗时(事务不允许被其他写中断)
- 每完成一段操作后让降级为读锁进行读取操作
- 事务全部完成后释放写锁
为什么不支持锁升级?
不支持锁升级(把持读锁、获取写锁,最后释放读锁的过程)。目的也是保证数据可见性,如果读锁已被多个线程获取,其中任意线程成功获取了写锁并更新了数据,则其更新对其他获取到读锁的线程是不可见的。
锁相关工具类(LockSupport)
LockSupport 是什么
LockSupport 定义了一组的公共静态方法,这些方法提供了最基本的线程阻塞和唤醒功能,而 LockSupport 也成为构建同步组件的基础工具 (AQS 中大量使用了该工具类)。
提供方法说明
- park 开头的方法用来阻塞当前线程。
- unpark(Thread thread) 方法来唤醒一个被阻塞的线程。
LockSupport 方法中最终调用的是 Unsafe 中的 native 代码:
public class LockSupport {
private static final Unsafe U = Unsafe.getUnsafe();
// 为给定的线程提供许可证(如果尚未提供)。 如果线程在 park 被阻塞,那么它将被解除阻塞。
// 否则,其下一次拨打 park 保证不被阻止。 如果给定的线程尚未启动,则此操作无法保证完全没有任何影响。
public static void unpark(Thread thread)
// 禁用当前线程进行线程调度,如果调用 unpark(Thread thread) 或者当前线程被中断,才能返回
public static void park()
// 在 park 基础上增加了阻塞对象标识,用于问题排查和系统监控
public static void park(Object blocker)
// 在 park(Object blocker) 基础上增加了超时返回
public static void parkNanos(Object blocker, long nanos)
// 在 park 基础上增加超时返回
public static void parkNanos(long nanos)
// 在 park 基础上增加超时截止时间返回
public static void parkUntil(long deadline)
// 在 park(Object blocker) 基础上增加了超时截止时间设置,超时后直接返回
public static void parkUntil(Object blocker, long deadline)
}
底层实现原理
LockSupport.park 的实现原理是通过二元信号量做的阻塞,要注意的是,这个信号量最多只能加到 1。
我们也可以理解成获取释放许可证的场景。
- unpark 方法会释放一个许可证
- park 方法则是获取许可证,如果当前没有许可证,则进入休眠状态,直到许可证被释放了才被唤醒。
无论执行多少次 unpark 方法,也最多只会有一个许可证。
在 Linux 系统下,是用的 Posix 线程库 pthread 中的 mutex(互斥量),condition(条件变量)来实现的。mutex 和 condition 保护了一个_counter 的变量:
- 当 park 时,这个变量被设置为 0
- 当 unpark 时,这个变量被设置为 1
每个Java线程都有一个 Parker 实例,Parker 类是这样定义的:
class Parker : public os::PlatformParker {
private:
volatile int _counter ; // 记录“许可”
...
public:
void park(bool isAbsolute, jlong time);
void unpark();
...
}
class PlatformParker : public CHeapObj<mtInternal> {
protected:
pthread_mutex_t _mutex [1] ; // 互斥量
pthread_cond_t _cond [1] ; // 条件变量
...
}
底层实现原理- park 过程
void Parker::park(bool isAbsolute, jlong time) {
// 当调用 park 时,先尝试能否直接拿到“许可”,即_counter>0 时,如果成功,则把_counter 设置为 0,并返回:
if (Atomic::xchg(0, &_counter) > 0) return;
// 如果不成功,则构造一个 ThreadBlockInVM,然后检查_counter 是不是>0,
// 如果是,则把_counter 设置为 0,unlock mutex 并返回:
ThreadBlockInVM tbivm(jt);
if (_counter > 0) { // no wait needed
_counter = 0;
status = pthread_mutex_unlock(_mutex);
// 否则,再判断等待的时间,然后再调用 pthread_cond_wait 函数等待
// 如果等待返回,则把_counter 设置为 0,unlock mutex 并返回:
if (time == 0) {
status = pthread_cond_wait (_cond, _mutex) ;
}
_counter = 0 ;
status = pthread_mutex_unlock(_mutex) ;
assert_status(status == 0, status, "invariant") ;
OrderAccess::fence();
底层实现原理-unpark 过程
当 unpark 时,直接设置_counter 为 1,再 unlock mutex 返回。
如果_counter 之前的值是 0,则还要调用 pthread_cond_signal 唤醒在 park 中等待的线程:
void Parker::unpark() {
int s, status ;
status = pthread_mutex_lock(_mutex);
assert (status == 0, "invariant") ;
s = _counter;
_counter = 1;
if (s < 1) {
if (WorkAroundNPTLTimedWaitHang) {
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
} else {
status = pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
status = pthread_cond_signal (_cond) ;
assert (status == 0, "invariant") ;
}
} else {
pthread_mutex_unlock(_mutex);
assert (status == 0, "invariant") ;
}
}
锁等待通知机制(Condition)
Condition 是什么
Condition 定义了等待/通知两种类型的方法,当前线程调用这些方法时,需要提前获取到 Condition 对象关联的锁。
Condition 对象是由 Lock 对象 (调用 Lock 对象的 newCondition() 方法) 创建出来的,Condition 是依赖 Lock 对象的。
Condition 提供方法说明
public interface Condition {
// 当前线程进入等待。
// 其他线程调用该 Condition 的 signal/signalAll 方法是被唤醒
// 其他线程调用 interrupt 方法中断当前线程
// 如果当前等待线程从 await 返回,表示该线程已经获取了 Condition 对象所在的锁
void await() throws InterruptedException;
// 在 await 方法基础上取消了响应中断的处理
void awaitUninterruptibly();
// 在 await 方法基础上支持超时返回
long awaitNanos(long nanosTimeout) throws InterruptedException;
// 在 await 方法基础上支持超时返回
boolean await(long time, TimeUnit unit) throws InterruptedException;
// 在 await 方法基础上支持超过截止时间返回
boolean awaitUntil(Date deadline) throws InterruptedException;
// 唤醒一个等待在 Condition 上的线程,该线程从等待方法返回前必须获得与 Condition 相关联的锁
void signal();
// 唤醒所有等待在 Condition 上的线程,能够从等待方法返回的线程必须获得与 Condition 相关联的锁
void signalAll();
}
Condition 实现分析
Condition 实现分析-等待队列
- 同步队列:同步队列是 AQS 中等待获取同步状态的队列。
- 等待队列:等待队列是一个 FIFO 的队列,在队列中的每个节点都包含了一个线程引用,该线程就是在 Condition 对象上等待的线程。
Condition 拥有首尾节点的引用,而新增节点只需要将原有的尾节点 nextWaiter 指向它,并且更新尾节点即可。
如果一个线程调用了 Condition.await 方法,那么该线程将会释放锁、构造成节点加入等待队列并进入等待状态。
上述节点引用更新的过程并没有使用 CAS 保证,原因在于调用 await 方法的线程必定是获取了锁的线程,也就是说该过程是由锁来保证线程安全的。
public class ConditionObject implements Condition, java.io.Serializable {
/** 首节点. */
private transient Node firstWaiter;
/** 尾节点. */
private transient Node lastWaiter;
...
}
与监视器(wait/notify)的队列区别:
- 在 Object 的监视器(wait/notify)模型上,一个对象拥有一个同步队列和等待队列
- 并发包中的 Lock(确切地说是同步器 AQS) 拥有一个同步队列和多个等待队列
Condition 实现分析-等待
调用 Condition 的 await 方法 (或以 await 开头的方法),会使当前线程进入等待队列并释放锁,同时线程状态变为等待状态。
当从 await 方法返回时,当前线程一定获取了 Condition 相关联的锁。
如果从队列的角度看 await 方法,当调用 await 方法时,相当于同步队列的首节点 (获取了锁的节点) 移动到 Condition 的等待队列中。
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter(); // 当前线程加入等待队列
int savedState = fullyRelease(node); // 释放同步状态,也就是释放锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this); // 进入等待状态
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 唤醒节点的线程开始尝试获取同步状态
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
- 调用 await 方法的线程已经成功获取了锁的线程,也就是同步队列中的首节点。
- 该方法会将当前线程构造成节点并加入等待队列中,然后释放同步状态,唤醒同步队列中的后继节点,然后当前线程会进入等待状态。
- 当等待队列中的节点被唤醒,则唤醒节点的线程开始尝试获取同步状态。
- 如果不是通过其他线程调用 Condition.signal 方法唤醒,而是对等待线程进行中断,则会抛出 InterruptedException。
Condition 实现分析-通知
调用 Condition 的 signal 方法,将会唤醒在等待队列中等待时间最长的节点 (首节点),在唤醒节点之前,会将节点移到同步队列中。
public final void signal() {
if (!isHeldExclusively()) // 当前线程必须是获取了锁的线程
throw new IllegalMonitorStateException();
Node first = firstWaiter; // 获取等待队列的首节点
if (first != null)
doSignal(first);
}
private void doSignal(Node first) {
do {
if ((firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!node.compareAndSetWaitStatus(Node.CONDITION, 0))
return false;
Node p = enq(node); // 等待队列中的头节点线程安全地移动到同步队列
int ws = p.waitStatus;
if (ws > 0 || !p.compareAndSetWaitStatus(ws, Node.SIGNAL)) // 节点状态更改为待唤醒状态
LockSupport.unpark(node.thread); // 唤醒节点
return true;
}
死锁问题及解决方案
什么是死锁
两个或多个线程互相持有对方需要的锁而导致这些线程全部处于永久阻塞状态。
如:线程 A 持有对象 1 的锁,等待对象 2 的锁;线程 B 持有对象 2 的锁,等待对象 1 的锁。
什么时候出现死锁
出现死锁必须具备以下几点:
- 要有两个或两个以上的线程
- 至少有两个共享资源的锁
- 至少存在两个线程各自拥有一个锁
- 现在这两个线程在等待获取彼此的锁,这就出现死锁了
死锁代码模拟
模拟逻辑说明:
两个线程 thread1、thread2 运行过程中需要在 lock1、lock2 两个共享资源上加锁。
- thread1 加锁顺序为 lock1->lock2
- thread1 加锁顺序为 lock2->lock1
synchronized 关键字模拟死锁场景
// synchronized 关键字出现死锁的场景
class SynchronizedDeadLockExample {
public static void main(String[] args) {
final Object lock1 = new Object();
final Object lock2 = new Object();
final Thread thread1 = new Thread(() -> {
synchronized (lock1) { // 对 lock1 加锁
System.out.println("lock1");
try {
sleep(200);
} catch (InterruptedException e) {
}
synchronized (lock2) { // 对 lock2 加锁
System.out.println("lock1->lock2");
}
}
});
thread1.setName("Thread-DeadLockExample-1");
final Thread thread2 = new Thread(() -> {
synchronized (lock2) { // 对 lock2 加锁
System.out.println("lock2");
try {
sleep(200);
} catch (InterruptedException e) {
}
synchronized (lock1) { // 对 lock1 加锁
System.out.println("lock2->lock1");
}
}
});
thread2.setName("Thread-DeadLockExample-2");
thread1.start();
thread2.start();
}
}
ReentrantLock 模拟死锁场景
// 注意:lock 加锁解锁代码为了方便演示移除 try {} finally {},实际开发不推荐这种写法
class LockDeadLockExample {
public static void main(String[] args) {
final ReentrantLock lock1 = new ReentrantLock();
final ReentrantLock lock2 = new ReentrantLock();
final Thread thread1 = new Thread(() -> {
lock1.lock();
System.out.println("lock1");
try {
sleep(200);
} catch (InterruptedException e) {
}
lock2.lock();
System.out.println("lock1->lock2");
lock2.unlock();
lock1.unlock();
});
thread1.setName("Thread-DeadLockExample-1");
final Thread thread2 = new Thread(() -> {
lock2.lock();
System.out.println("lock2");
try {
sleep(200);
} catch (InterruptedException e) {
}
lock1.lock();
System.out.println("lock2->lock1");
lock1.unlock();
lock2.unlock();
});
thread2.setName("Thread-DeadLockExample-2");
thread1.start();
thread2.start();
}
}
如何避免死锁
- 加锁顺序:保证获取锁的顺序一致,例如示例中永远以 lock1->lock2 顺序加锁解锁
- 加锁时限:如果一个线程没有在指定的时间期限内获取到锁,则结束当前线程并释放掉已获得的锁
- 死锁检测:利用开源工具扫描检测
锁总结
Java 锁的分类没有严格意义的规则,我们常说的分类一般都是依据锁的特性、锁的设计、锁的状态等进行归纳整理。
- 乐观锁、悲观锁
- 自旋锁、适应性自旋锁
- 无锁、偏向锁、轻量级锁、重量级锁
- 公平锁、非公平锁
- 可重入锁、非可重入锁
- 排他锁、共享锁
- 读写锁
- 分段锁
- 可中断锁
乐观锁、悲观锁
乐观锁与悲观锁是一种广义上的概念,体现了看待线程同步的不同角度。在 Java 和数据库中都有此概念对应的实际应用。
对于同一个数据的并发操作:
- 悲观锁:认为自己在使用数据的时候一定有别的线程来修改数据,因此在获取数据的时候会先加锁,确保数据不会被别的线程修改。
- 乐观锁:认为自己在使用数据时不会有别的线程修改数据,所以不会添加锁,只是在更新数据的时候去判断之前有没有别的线程更新了这个数据。如果这个数据没有被更新,当前线程将自己修改的数据成功写入。如果数据已经被其他线程更新,则根据不同的实现方式执行不同的操作(例如报错或者自动重试)。
实现
- 乐观锁
- 在 Java 中是通过使用无锁编程来实现,最常采用的是CAS 算法,Java 原子类中的递增操作就通过 CAS 自旋实现的。
- 悲观锁
- synchronized
- Lock 实现类
场景分析
- 乐观锁适合读操作多的场景,不加锁的特点能够使其读操作的性能大幅提升。
- 悲观锁适合写操作多的场景,先加锁可以保证写操作时数据正确。
自旋锁、适应性自旋锁
自旋锁的概念
阻塞或唤醒一个 Java 线程需要操作系统切换 CPU 状态来完成,这种状态转换需要耗费处理器时间。如果同步代码块中的内容过于简单,状态转换消耗的时间有可能比用户代码执行的时间还要长。
在许多场景中,同步资源的锁定时间很短,为了这一小段时间去切换线程,线程挂起和恢复现场的花费可能会让系统得不偿失。如果物理机器有多个处理器,能够让两个或以上的线程同时并行执行,我们就可以让后面那个请求锁的线程不放弃 CPU 的执行时间,看看持有锁的线程是否很快就会释放锁。
为了让当前线程“稍等一下”,我们需让当前线程进行自旋,如果在自旋完成后前面锁定同步资源的线程已经释放了锁,那么当前线程就可以不必阻塞而是直接获取同步资源,从而避免切换线程的开销。这就是自旋锁。
适应性自旋锁
自旋锁在 JDK1.4.2 中引入,使用-XX:+UseSpinning 来开启。JDK 6 中变为默认开启,并且引入了自适应的自旋锁(适应性自旋锁)。
自适应意味着自旋的时间(次数)不再固定,而是由前一次在同一个锁上的自旋时间及锁的拥有者的状态来决定。
- 如果在同一个锁对象上,自旋等待刚刚成功获得过锁,并且持有锁的线程正在运行中,那么虚拟机就会认为这次自旋也是很有可能再次成功,进而它将允许自旋等待持续相对更长的时间。
- 如果对于某个锁,自旋很少成功获得过,那在以后尝试获取这个锁时将可能省略掉自旋过程,直接阻塞线程,避免浪费处理器资源。
缺点
自旋锁本身是有缺点的,它不能代替阻塞。自旋等待虽然避免了线程切换的开销,但它要占用处理器时间。如果锁被占用的时间很短,自旋等待的效果就会非常好。反之,如果锁被占用的时间很长,那么自旋的线程只会白浪费处理器资源。
所以,自旋等待的时间必须要有一定的限度,如果自旋超过了限定次数(默认是 10 次,可以使用-XX:PreBlockSpin 来更改)没有成功获得锁,就应当挂起线程。
实现
自旋锁的实现原理同样也是 CAS。
AtomicInteger 中调用 unsafe 进行自增操作的源码中的 do-while 循环就是一个自旋操作,如果修改数值失败则通过循环来执行自旋,直至修改成功。
无锁、偏向锁、轻量级锁、重量级锁
Java SE 1.6 为了减少获得锁和释放锁带来的性能消耗,引入了 “偏向锁” 和 “轻量级锁”。
Java SE 1.6 中,锁一共有 4 种状态,级别从低到高依次是: 无锁状态、偏向锁状态、轻量级锁状态和重量级锁状态,这几个状态会随着竞争情况逐渐升级。锁可以升级但不能降级,意味着偏向锁升级成轻量级锁后不能降级成偏向锁。这种锁升级却不能降级的策略,目的是为了提高获得锁和释放锁的效率。
实现
- synchronized
公平锁、非公平锁
公平锁
公平锁是指多个线程按照申请锁的顺序来获取锁,线程直接进入队列中排队,队列中的第一个线程才能获得锁。
- 优点是等待锁的线程不会饿死。
- 缺点是整体吞吐效率相对非公平锁要低,等待队列中除第一个线程以外的所有线程都会阻塞,CPU 唤醒阻塞线程的开销比非公平锁大。
非公平锁
非公平锁是多个线程加锁时直接尝试获取锁,获取不到才会到等待队列的队尾等待。但如果此时锁刚好可用,那么这个线程可以无需阻塞直接获取到锁,所以非公平锁有可能出现后申请锁的线程先获取锁的场景。
- 优点是可以减少唤起线程的开销,整体的吞吐效率高,因为线程有几率不阻塞直接获得锁,CPU 不必唤醒所有线程。
- 缺点是处于等待队列中的线程可能会饿死,或者等很久才会获得锁。
实现
- ReentrantLock
- ReentrantReadWriteLock
可重入锁、非可重入锁
可重入锁又名递归锁,是指在同一个线程在外层方法获取锁的时候,再进入该线程的内层方法会自动获取锁(前提锁对象得是同一个对象或者 class),不会因为之前已经获取过还没释放而阻塞。可重入锁的一个优点是可一定程度避免死锁。
为什么非可重入锁会出现死锁?
ReentrantLock 和 synchronized 都是重入锁,那么我们通过重入锁 ReentrantLock 以及非可重入锁 NonReentrantLock 的源码来对比分析一下为什么非可重入锁在重复调用同步资源时会出现死锁。
ReentrantLock 继承父类 AQS,AQS 中维护了一个同步状态 status 来计数重入次数,status 初始值为 0。
获取锁时
- 可重入锁先尝试获取并更新 status 值,如果 status == 0 表示没有其他线程在执行同步代码,则把 status 置为 1,当前线程开始执行。如果 status != 0,则判断当前线程是否是获取到这个锁的线程,如果是的话执行 status+1,且当前线程可以再次获取锁。
- 非可重入锁是直接去获取并尝试更新当前 status 的值,如果 status != 0 的话会导致其获取锁失败,当前线程阻塞。
释放锁时
- 可重入锁同样先获取当前 status 的值,在当前线程是持有锁的线程的前提下。如果 status-1 == 0,则表示当前线程所有重复获取锁的操作都已经执行完毕,然后该线程才会真正释放锁。
- 非可重入锁则是在确定当前线程是持有锁的线程之后,直接将 status 置为 0,将锁释放。
实现
- 可重入锁
- ReentrantLock
- synchronized
- ReentrantReadWriteLock 的读写锁
- 非可重入锁
- 暂无
排他锁、共享锁
排他锁和共享锁同样是一种概念。
排他锁是指该锁一次只能被一个线程所持有。如果线程 T 对数据 A 加上排它锁后,则其他线程不能再对 A 加任何类型的锁。获得排它锁的线程即能读数据又能修改数据。
共享锁是指该锁可被多个线程所持有。如果线程 T 对数据 A 加上共享锁后,则其他线程只能对 A 再加共享锁,不能加排它锁。获得共享锁的线程只能读数据,不能修改数据。
通过 ReentrantLock 和 ReentrantReadWriteLock 的源码来介绍排他锁和共享锁。
排他锁与共享锁也是通过 AQS 来实现的,通过实现不同的方法,来实现独享或者共享。
ReentrantReadWriteLock 分析
我们看到 ReentrantReadWriteLock 有两把锁:ReadLock 和 WriteLock,一个读锁一个写锁,合称“读写锁”。
进一步观察可以发现 ReadLock 和 WriteLock 是靠内部类 Sync 实现的锁。Sync 是 AQS 的一个子类,这种结构在 CountDownLatch、ReentrantLock、Semaphore 里面也都存在。
在 ReentrantReadWriteLock 里面,读锁和写锁的锁主体都是 Sync,但读锁和写锁的加锁方式不一样。
读锁是共享锁,写锁是排他锁。读锁的共享锁可保证并发读非常高效,而读写、写读、写写的过程互斥,因为读锁和写锁是分离的。所以 ReentrantReadWriteLock 的并发性相比一般的互斥锁有了很大提升。
ReentrantLock 分析
我们发现在 ReentrantLock 虽然有公平锁和非公平锁两种,但是它们添加的都是排他锁。根据源码所示,当某一个线程调用 lock 方法获取锁时,如果同步资源没有被其他线程锁住,那么当前线程在使用 CAS 更新 state 成功后就会成功抢占该资源。而如果公共资源被占用且不是被当前线程占用,那么就会加锁失败。所以可以确定 ReentrantLock 无论读操作还是写操作,添加的锁都是都是排他锁。
实现
- 排他锁
- synchronized
- Lock 相关实现类,ReentrantReadWriteLock 写锁
- Lock 相关实现类,ReentrantLock
- 共享锁
- ReentrantReadWriteLock 读锁
分段锁
实质是一种锁的设计策略,不是具体的锁,对于 ConcurrentHashMap 而言其并发的实现就是通过分段锁的形式来实现高效并发操作。
当要 put 元素时并不是对整个 hashMap 加锁,而是先通过 hashcode 知道它要放在哪个分段,然后对分段进行加锁,所以多线程 put 元素时只要放在的不是同一个分段就做到了真正的并行插入,但是统计 size 时就需要获取所有的分段锁才能统计。
分段锁的设计是为了细化锁的粒度。
可中断锁
synchronized 是不可中断的,Lock 是可中断的,这里的可中断建立在阻塞等待中断,运行中是无法中断的。
实现
- Lock 实现类提供响应中断状态的加锁
四、并发容器-阻塞队列
BlockingQueue 阻塞队列是什么
- 队列(Queue),是一种特殊的线性表,它只允许在表的前端进行删除操作,而在表的后端进行插入操作。
- 阻塞与非阻塞,关注的是程序在等待调用结果(消息,返回值)时的状态.
- 阻塞调用是指调用结果返回之前,当前线程会被挂起。调用线程只有在得到结果之后才会返回。
- 非阻塞调用指在不能立刻得到结果之前,该调用不会阻塞当前线程。
阻塞队列(BlockingQueue 接口)是一个支持阻塞的插入和移除方法的队列,主要采用锁实现。
- 支持阻塞的插入方法 : 意思是当队列满时,队列会阻塞插入元素的线程,直到队列不满
- 支持阻塞的移除方法 : 意思是在队列为空时,获取元素的线程会等待队列变为非空
BlockingQueue 接口相关方法说明
抛出异常的操作方法
- add 方法:插入列尾,当队列满时,会抛出 IllegalStateException 异常。
- remove 方法:返回并移除队列头,当队列为空时,从队列里获取元素会抛出 NoSuchElementException 异常。
- element 方法:返回队列头(不移除),当队列满时,会抛出 IllegalStateException 异常。
返回特殊值的操作方法
- offer 方法:插入列尾,成功返回 true。
- poll 方法:返回并移除队列头,如果没有元素则返回 null。
- peek 方法:返回队列头(不移除),如果没有元素则返回 null。
一直阻塞的操作方法
- put 方法:插入列尾,当队列满时阻塞等待,直到队列可用或者响应中断退出。
- take 方法:返回并移除队列头,当队列为空时阻塞等待,直到队列可用或者响应中断退出。
BlockingQueue 接口的实现类有哪些
JDK1.8 提供了 7 个阻塞队列,如下:
- ArrayBlockingQueue : 数组结构组成的有界阻塞队列
- LinkedBlockingQueue : 链表结构组成的有界阻塞队列
- PriorityBlockingQueue : 支持优先级排序的无界阻塞队列
- DelayQueue : 支持延时获取元素的无界阻塞队列
- SynchronousQueue : 不存储元素的阻塞队列
- LinkedTransferQueue : 由链表结构组成的无界阻塞队列
- LinkedBlockingDeque : 由链表结构组成的双向阻塞队列
ArrayBlockingQueue-数组、有界
一个用数组实现的有界阻塞队列。此队列按照先进先出 (FIFO) 的原则对元素进行排序。
-
特性:
- 支持公平、非公平选择
实现原理
- 内部维护一个
Object[] items
数组 - 使用 ReentrantLock 锁、 2 个 Condition 条件(notEmpty、notFull)完成并发访问
- 对该队列的数组操作使用 ReentrantLock 加锁解锁,阻塞条件使用 Condition 等待通知机制完成
LinkedBlockingQueue-链表、有界
一个用链表实现的有界阻塞队列。此队列的默认和最大长度为 Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
实现原理:
- 内部维护链表的头节点、尾节点
- 使用 ReentrantLock putLock 、Condition notFull 管理入队操作
- 使用 ReentrantLock takeLock 、Condition notEmpty 管理出队操作
- 完全的生产者消费者模式的实现
PriorityBlockingQueue-优先级排序、无界
一个支持优先级的无界阻塞队列。默认情况下元素采取自然顺序升序排列。也可以自定义类实现 compareTo() 方法来指定元素排序规则,或者初始化 PriorityBlockingQueue 时,指定构造参数 Comparator 来对元素进行排序。需要注意的是不能保证同优先级元素的顺序。
实现原理:
- 内部维护一个
Object[] items
数组 - 使用 ReentrantLock lock 、Condition notEmpty 管理入队出队操作(因为无界,无需 notFull 条件)
- 入队时,判断是否需要扩容,插入数组(重新排序(最小堆))
- 出队时,移除下标为 0 的数据(列头),重新排序
DelayQueue-延时获取、无界
一个支持延时获取元素的无界阻塞队列。队列使用 PriorityQueue 来实现。
队列中的元素必须实现 Delayed 接口,在创建元素时可以指定多久才能从队列中获取当前元素。只有在延迟期满时才能从队列中提取元素。
实现原理:
- 内部维护一个 PriorityQueue(支持排序的队列实现类)
- 使用 ReentrantLock lock 、Condition available 管理入队出队操作(因为无界,无需 notFull 条件)
- 入队时,数据添加至 PriorityQueue 队列,判断当前头节点是否等于添加的数据,如果不是唤醒出队操作
- 出队时,for 循环自旋,取出头节点如果达到执行时间即返回,否则进入等待
SynchronousQueue-不存储元素
一个不存储元素的阻塞队列。每一个 put 操作必须等待一个 take 操作,否则不能继续添加元素。
SynchronousQueue 可以看成是一个传球手,负责把生产者线程处理的数据直接传递给消费者线程。队列本身并不存储任何元素,非常适合传递性场景。SynchronousQueue 的吞吐量高于 LinkedBlockingQueue 和 ArrayBlockingQueue。
- 特性:
- 支持公平、非公平选择
LinkedTransferQueue-链表、无界
一个由链表结构组成的无界阻塞 TransferQueue 队列。相对于其他阻塞队列,LinkedTransferQueue 多了 tryTransfer 和 transfer 方法。
方法解读:
- transfer 方法 :可以把生产者传入的元素立刻"传输"给消费者。如果没有消费者在等待接收元素,transfer 方法会将元素存放在队列的 tail 节点,并等到该元素被消费者消费了才返回
- tryTransfer 方法 : 用来试探生产者传入的元素是否能直接传给消费者。如果没有消费者等待接收元素,则返回 false。
- transfer 和 transfer 方法的区别: tryTransfer 方法无论消费者是否接收,方法立即返回,而 transfer 方法是必须等到消费者消费了才返回。
LinkedBlockingDeque-链表、双向阻塞队列
一个由链表结构组成的双向阻塞队列。所谓双向队列指的是可以从队列的两端插入和移出元素。双向队列因为多了一个操作队列的入口,在多线程同时入队时,也就减少了一半的竞争。
相比其他的阻塞队列,LinkedBlockingDeque 多了 addFirst、 addLast、offerFirst、offerLast、peekFirst 和 peekLast 等方法,以 First 单词结尾的方法,表示插入、获取 (peek) 或移除双端队列的第一个元素。
以 Last 单词结尾的方法,表示插入、获取或移除双 端队列的最后一个元素。
另外,插入方法 add 等同于 addLast,移除方法 remove 等效于 removeFirst。
但是 take 方法却等同于 takeFirst,不知道是不是 JDK 的 bug,使用时还是用带有 First 和 Last 后缀的方法更清楚。
应用场景
- 生产消费者模型
并发容器-ConcurrentLinkedQueue
ConcurrentLinkedQueue 是什么
阻塞的实现方式可以用锁,非阻塞的实现方式可以使用循环 CAS 的方式来实现。
ConcurrentLinkedQueue 是一个基于链接节点的无界线程安全队列,主要特性:
- 非阻塞 CAS 操作实现
- 它采用先进先出的规则对节点进行排序
- 当我们添加一个元素的时候,它会添加到队列的尾部
- 当我们获取一个元素时,它会返回队列头部的元素。
ConcurrentLinkedQueue 内部数据结构
ConcurrentLinkedQueue 由 head 节点和 tail 节点组成,每个节点 (Node) 由节点元素 (item) 和指向下一个节点 (next) 的引用组成,节点与节点之间就是通过这个 next 关联起来,从而组成一 张链表结构的队列。默认情况下 head 节点存储的元素为空,tail 节点等于 head 节点。
private transient volatile Node<E> head; // head 节点
private transient volatile Node<E> tail; // tail 节点,不一定是最后一个节点
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null); // 默认相等
}
ConcurrentLinkedQueue 入队操作解析
由于 ConcurrentLinkedQueue 是无界的,所以 offer 永远返回 true
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) { // p 用来表示队列的尾节点,默认情况下等于 tail 节点。
Node<E> q = p.next; // p 的 next 节点
// p 没有 next 节点
if (q == null) {
if (p.casNext(null, newNode)) {
// 如果 p != t,则将入队结点设置成 tail 结点,更新失败了也没关系
// 因为失败了表示有其他线程成功更新了 tail 结点
if (p != t)
casTail(t, newNode); // cas 修改尾节点
return true;
}
}
// 多线程操作时候,由于 poll 方法会把旧的 head 变为自引用,然后将 head 的 next 设置为新的 head
// 所以需要重新找新的 head,因为新的 head 后面的节点才是激活的节点
else if (p == q) // p 有 next 节点,且是 tail 节点
p = (t != (t = tail)) ? t : head;
// 寻找尾节点
else
p = (p != t && t != (t = tail)) ? t : q;
}
}
主要逻辑:
- 将入队节点设置成当前队列尾节点的下一个节点;
- 更新 tail 节点
- 如果 tail 节点的 next 节点不为空,则将入队节点设置成 tail 节点,
- 如果 tail 节点的 next 节点为空,则将入队节点设置成 tail 的 next 节点,所以 tail 节点不总是尾节点
tail 节点并不总是尾节点,所以每次入队都必须先通过 tail 节点来找到尾节点。尾节点可能是 tail 节点,也可能是 tail 节点的 next 节点。
ConcurrentLinkedQueue 出队操作解析
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item; // 获取头节点的元素
// 头节点元素不为空,CAS 的方式将头节点的引用设置成 null
if (item != null && p.casItem(item, null)) {
if (p != h) // 头节点已变更
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) { // 头节点已经变更更新头节点
updateHead(h, p);
return null;
}
else if (p == q) // 已经出队跳出
continue restartFromHead;
else
p = q; // 重新寻找
}
}
}
主要逻辑:
- 首先获取头节点的元素,然后判断头节点元素是否为空
- 如果为空,表示另外一个线程已经进行了一次出队操作将该节点的元素取走
- 如果不为空,则使用 CAS 的方式将头节点的引用设置成 null
- 如果 CAS 成功,则直接返回头节点的元素
- 如果 CAS 不成功,表示另外一个线程已经进行了一次出队操作更新了 head 节点,导致元素发生了变化,需要重新获取头节点
并行任务框架-ForkJoin
Fork/Join 框架是什么
是 Java 7 提供的一个用于并行执行任务的框架,是一个把大任务分割成若干个小任务,最终汇总每个小任务结果后得到大任务结果的框架。
- Fork 就是把一个大任务切分为若干子任务并行的执行。
- Join 就是合并这些子任务的执行结果,最后得到这个大任务的结果。
Fork/Join 框架根据工作窃取算法设计,那么什么是工作窃取算法呢?
工作窃取算法介绍
工作窃取算法(work-stealing)是指某个线程从其他队列里窃取任务来执行。
为什么需要使用工作窃取算法呢?
假如我们有若干线程一起计算,可能有效线程的计算早早结束,结束的线程与其等着,不如去帮其他线程干活,于是它就去其他线程的队列里窃取一个任务来执行。
而在这时它们会访问同一个队列,所以为了减少窃取任务线程和被窃取任务线程之间的竞争,通常会使用双端队列维护:
- 被窃取任务线程永远从双端队列的头部拿任务执行。
- 窃取任务的线程永远从双端队列的尾部拿任务执行。
优缺点:
- 优点:充分利用线程进行并行计算,减少了线程间的竞争。
- 缺点:在某些情况下还是存在竞争,比如双端队列里只有一个任务时。并且该算法会消耗了更多的系统资源,比如创建多个线程和多个双端队列。
Fork/Join 框架的设计
Fork,分割任务。首先我们需要有一个 fork 类来把大任务分割成子任务,有可能子任务还是很大,所以还需要不停地分割,直到分割出的子任务足够小。抽象类 ForkJoinTask 提供了 2 个子抽象类:
- RecursiveAction:用于没有返回结果的任务。
- RecursiveTask:用于有返回结果的任务。
Join,执行任务并合并结果。分割的子任务分别放在双端队列里,然后几个启动线程分别从双端队列里获取任务执行。子任务执行完的结果都统一放在一个队列里,启动一个线程从队列里拿数据,然后合并这些数据。
- ForkJoinPool:ForkJoinTask 需要通过 ForkJoinPool 来执行。
Fork/Join 框架实现原理
Fork 处理逻辑
- 如果当前线程是 ForkJoinWorkerThread 类型的线程则任务提交到依赖的 ForkJoinPool 中执行
- 否则使用一个静态公用的 ForkJoinPool 执行
- 提交过程为:把当前任务存放在 ForkJoinTask 数组队列里。然后再调用 ForkJoinPool 的 signalWork 方法唤醒或创建一个工作线程来执行任务。
方法源码参考:
// ForkJoinTask.ForkJoinTask 方法
public final ForkJoinTask<V> fork() {
Thread t;
if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread)
((ForkJoinWorkerThread)t).workQueue.push(this);
else
ForkJoinPool.common.externalPush(this);
return this;
}
// WorkQueue.push 方法
final void push(ForkJoinTask<?> task) {
ForkJoinTask<?>[] a; ForkJoinPool p;
int b = base, s = top, n;
if ((a = array) != null) { // ignore if queue removed
int m = a.length - 1; // fenced write for task visibility
U.putOrderedObject(a, ((m & s) << ASHIFT) + ABASE, task);
U.putOrderedInt(this, QTOP, s + 1);
if ((n = s - b) <= 1) {
if ((p = pool) != null)
p.signalWork(p.workQueues, this);
}
else if (n >= m)
growArray();
}
}
Join 处理逻辑
任务状态有 4 种:已完成 (NORMAL)、被取消 (CANCELLED)、信号 (SIGNAL) 和出现异常 (EXCEPTIONAL)。
- 调用 doJoin 方法,得到当前任务的状态来判断返回什么结果
- 如果任务状态是已完成,则直接返回任务结果。
- 如果任务状态是被取消,则直接抛出 CancellationException。
- 如果任务状态是抛出异常,则直接抛出对应的异常。
- 在 doJoin 方法里,首先通过查看任务的状态,看任务是否已经执行完成。
- 如果执行完成,则直接返回任务状态;
- 如果没有执行完,则从任务数组里取出任务并执行。
- 如果任务顺利执行完成,则设置任务状态为 NORMAL,
- 如果出现异常,则记录异常,并将任务状态设置为 EXCEPTIONAL。
public final V join() {
int s;
if ((s = doJoin() & DONE_MASK) != NORMAL)
reportException(s);
return getRawResult();
}
private int doJoin() {
int s; Thread t; ForkJoinWorkerThread wt; ForkJoinPool.WorkQueue w;
return (s = status) < 0 ? s :
((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) ?
(w = (wt = (ForkJoinWorkerThread)t).workQueue).
tryUnpush(this) && (s = doExec()) < 0 ? s :
wt.pool.awaitJoin(w, this, 0L) :
externalAwaitDone();
}
Fork/Join 框架代码实战
计算 1+2+3+…+100,如果加数之间差值大于等于 10 则拆分为子任务
@Slf4j
@Getter
@AllArgsConstructor
public class ForkJoinExample extends RecursiveTask<Integer> {
private static final int THRESHOLD = 10; // 阈值
private int start;
private int end;
@Override
protected Integer compute() {
int sum = 0;
boolean canCompute = (end - start) <= THRESHOLD;
if (canCompute) {
for (int i = start; i <= end; i++) {
sum += i;
}
} else {
// 如果任务大于阈值,就分裂成两个子任务计算
int middle = (start + end) / 2;
final ForkJoinExample leftTask = new ForkJoinExample(start, middle);
final ForkJoinExample rightTask = new ForkJoinExample(middle + 1, end);
leftTask.fork();
rightTask.fork();
// 等待子任务执行完,并得到其结果
int leftResult = leftTask.join();
int rightResult = rightTask.join();
// 合并子任务
sum = leftResult + rightResult;
}
return sum;
}
public static void main(String[] args) {
final ForkJoinPool forkJoinPool = new ForkJoinPool();
// 生成一个计算任务,负责计算
final ForkJoinExample task = new ForkJoinExample(1, 100);
// 异步执行一个任务
final Future<Integer> result = forkJoinPool.submit(task);
// final Integer r2 = forkJoinPool.invoke(task); // 同步执行
try {
log.info("sum = {}", result.get());
} catch (InterruptedException | ExecutionException ignored) {
}
}
}
应用场景
- 可以采用"分而治之"的算法场景
- 计算密集型的任务
等待多线程完成-CountDownLatch
CountDownLatch 是什么?
CountDownLatch 允许一个或多个线程等待其他线程完成操作。
CountDownLatch 如何使用
- CountDownLatch 的构造方法需要传入一个计数值,计数值即需要等待的线程数量。
- 任务封装为线程模式异步运行,每个任务完成后调用 countDown 方法(计数值 -1)
- 调用 await 方法,当前线程进入等待状态
- 计数值为 0 时,将从 await 方法返回,继续执行后续逻辑
我们模拟一个方法,可以解析 excel 工作簿 ,每个工作簿解析由单独线程完成,全部完成后返回执行结果,伪代码如下:
public class CountDownLatchExample {
// 线程池
private final ExecutorService executor = Executors.newFixedThreadPool(2);
// 解析 excel 工作簿,sheets 为多个工作簿
public boolean resolveExcel(List<Object> sheets) {
// 每个工作簿解析为一个计数
final CountDownLatch latch = new CountDownLatch(sheets.size());
// 每个工作簿封装为一个工作线程提交到线程池,完成后计数 -1
sheets.forEach(o ->
executor.submit(() -> {
log.info("解析工作簿... {}", o);
latch.countDown(); // 完成后计数 -1
}));
try {
return latch.await(2, TimeUnit.MINUTES); // 等待 2 min
} catch (InterruptedException e) {
log.warn("解析超时,当前计数[{}]", latch.getCount());
}
return false;
}
}
CountDownLatch 实现原理
部分关键源码解读,需要 AbstractQueuedSynchronizer(AQS) 基础知识:
public class CountDownLatch {
// 基于 AQS 实现的共享式同步状态相关操作
private static final class Sync extends AbstractQueuedSynchronizer
// 创建 CountDownLatch ,需指定计数器
public CountDownLatch(int count)
// 进入等待状态。
// 共享式的获取同步状态,响应中断
public void await() throws InterruptedException
// 进入等待状态,在 await 基础上支持超时等待。
public boolean await(long timeout, TimeUnit unit) throws InterruptedException
// 当前计数 -1 ,底层表现为,同步状态值 -1
public void countDown()
// 获取当前计数结果 ,底层表现为,当前同步状态值
public long getCount()
}
主要逻辑为:
- 初始化一个共享式锁,初始化时同步状态设置为计数值 N(理解为:初始化后已经被 N 个线程持有锁了)
- 调用 wait 方法后,当前线程进入等待状态,等待同步状态为 0 时获取锁
- 每个线程完成后调用 countDown 方法,即同步状态 -1
- 所有线程完成后,同步状态为 0,wait 方法所在线程获取锁继续执行
CountDownLatch 和 join 的区别
join 用于让当前执行线程等待 join 线程执行结束。其实现原理是不停检查 join 线程是否存活,如果 join 线程存活则让当前线程永远等待。
- 调用 join 方法需要等待 thread 执行完毕才能继续向下执行
- CountDownLatch 只需要检查计数器的值为零就可以继续向下执行
相比之下,CountDownLatch 更加灵活一些,可以实现一些更加复杂的业务场景。
应用场景
- 需要等待多个线程完成后继续执行的场景。
同步屏障-CyclicBarrier
CyclicBarrier 是什么?
CyclicBarrier 的字面意思是可循环使用的屏障。主要是让一组线程到达一个"屏障"时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
举一个抽象的例子:
- 有一个密室项目,有 5 个密道的门,每道门默认情况下是密封的
- 有 10 个人参与该项目
- 项目的规定是,找到门后必须等待所有参与的人到齐后,门才可以打开,然后进入下一个密道
该例子中:
- 密道的门即:屏障
- 人到齐后才可以开门进入下一个密道即:到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行
CyclicBarrier 如何使用
构造方法参数说明 CyclicBarrier(int parties, Runnable barrierAction)
- parties 是参与线程的个数
- 第二个构造方法有一个 Runnable 参数,这个参数的意思是最后一个到达线程要做的任务
- getNumberWaiting 方法可以获得阻塞的线程数量
- isBroken 方法用来了解阻塞的线程是否被中断
- reset 方法用于重置状态
下面示例中模拟了 5 道门(屏障),线程的睡眠表示每个人找下一道门花费的时间:
- 以上述抽象例子的密室项目为例:
- 每个人花费一定的时间寻找当前的密道门
- 所有人找到当前密道门时,密道门才打开,进入下一密道
public class CyclicBarrierExample {
// 线程池
private final ExecutorService executor = Executors.newFixedThreadPool(5);
// 密室项目 , people 参与人数
public void resolveExcel(int people) {
final CyclicBarrier latch = new CyclicBarrier(people);
for (int i = 0; i < people; i++) {
int pName = i;
executor.submit(() -> {
log.info("{} , 进入密室...", pName);
try {
latch.await(); // 第一道门
// 模拟一个随机时间,作为迷宫的寻找时间
Thread.sleep(new Random().nextInt(1000));
latch.await(); // 第二道门
Thread.sleep(new Random().nextInt(1000));
latch.await(); // 第三道门
Thread.sleep(new Random().nextInt(1000));
latch.await(); // 第四道门
Thread.sleep(new Random().nextInt(1000));
latch.await(); // 第五道门
} catch (InterruptedException | BrokenBarrierException e) {
log.info("等待过程异常");
}
log.info("{} , 逃出密室...", pName);
});
}
}
}
CyclicBarrier 实现原理
CyclicBarrier 内部使用 ReentrantLock 与 Condition 维护等待状态,放行状态。
- ReentrantLock 用于正确修改维护等待状态的变量
- Condition 用于线程的等待通知机制实现
主要处理逻辑:
- 调用 await 的线程判断当前等待数量是否等于屏障放行数量
- 如果不可以放行,当前计数-1,当前线程进入等待状态
- 如果可以放行,重置等待相关变量数据,唤醒所有等待的线程
- 如果调用 reset 方法重置后,等待的状态将变为破坏,等待中的线程将抛出 BrokenBarrierException 异常
应用场景
- 可以用于多线程计算数据,最后合并计算结果的场景。
- 如果计算发生错误,可以重置计数器,并让线程重新执行一次。
CyclicBarrier 与 CountDownLatch 区别
- CountDownLatch 计数器是一次性的,CyclicBarrier 计数器是可循环利用的。
- CountDownLatch 参与的线程的职责是不一样的,有的在倒计时(工作的线程),有的在等待倒计时结束(启动工作的线程)。CyclicBarrier 参与的线程职责是一样的。
控制并发数的信号量-Semaphore
Semaphore 是什么?
Semaphore(信号量) 是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
Semaphore 如何使用
我们模拟一个数据库连接池,池中连接需要根据实际情况控制并发连接。
- 获取连接时拿到一个许可。acquire 方法。
- 释放时归还许可。release 方法。
public abstract class SemaphoreExample {
private final Object[] connections; // 数据库可用连接
private final Semaphore available = new Semaphore(10, true); // 信号量
// 控制访问数量的方式获取可用连接,达到访问数量最大值时,阻塞
public Object getConnection() throws InterruptedException {
available.acquire();
return getNextAvailableConnection();
}
// 放回连接
public void putConnection(Object x) {
if (markAsUnused(x))
available.release();
}
abstract Object getNextAvailableConnection();
abstract boolean markAsUnused(Object connection);
}
Semaphore 还提供一些其他方法:
- intavailablePermits :返回此信号量中当前可用的许可证数
- intgetQueueLength :返回正在等待获取许可证的线程数
- booleanhasQueuedThreads :是否有线程正在等待获取许可证
- reducePermits :减少 reduction 个许可证
- getQueuedThreads :返回所有等待获取许可证的线程集合
Semaphore 实现原理
因为 Semaphore 支持公平与非公平选择,内部实现机制为:
- 内部基于 AbstractQueuedSynchronizer(AQS)实现一个公平与非公平公共的父类 Sync ,用于管理同步状态
- Sync 主要使用共享式锁相关模板方法
- FairSync 继承 Sync 用于处理公平问题
- NonfairSync 继承 Sync 用于处理非公平问题
主要处理逻辑为:
- 初始化 Semaphore 后,同步状态值设置为许可数量
- 调用 acquire 方法后,许可数量 -1(同步状态值-1)
- 调用 release 方法后,许可数量 +1(同步状态值+1)
应用场景
- 流量控制,特别是公用资源有限的应用场景
线程交换数据-Exchanger
Exchanger 是什么
Exchanger(交换者) 是一个线程间协作的工具类,用于进行线程间的数据交换。
它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。
这两个线程通过 exchange 方法交换数据:
- 如果第一个线程先执行 exchange 方法,它会一直等待第二个线程也执行 exchange 方法
- 当两个线程都到达同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
注意:
- 如果偶数个线程执行,可以交换
- 如果奇数个线程执行,可能会剩余最后一个线程一直等待
Exchanger 如何使用
模拟一个示例:
- 比如 2 个计算异步执行,但是他们在执行一段逻辑后需要对方的中间计算结果。
- 在需要对方结果的方法处,调用 exchange 方法即可等待获取。
- 都执行了 exchange 方法后 2 个中间计算结果交换完成。
public class ExchangerExample {
private final Exchanger<String> exchanger = new Exchanger<>();
class TypeA implements Runnable {
public void run() {
log.info("TypeA 执行计算逻辑...");
String a = "TypeA 中间计算结果";
try {
String exchange = exchanger.exchange(a);
log.info("获取 TypeB ={}", exchange);
// 获取 TypeB 内容后继续处理
} catch (InterruptedException ignored) {
}
}
}
class TypeB implements Runnable {
public void run() {
log.info("TypeB 执行计算逻辑...");
String b = "TypeB 中间计算结果";
try {
String exchange = exchanger.exchange(b);
log.info("获取 TypeA ={}", exchange);
// 获取 TypeA 内容后继续处理
} catch (InterruptedException ignored) {
}
}
}
}
如果两个线程有一个没有执行 exchange 方法,则会一直等待,可以使用 public V exchange(V x, long timeout, TimeUnit unit)
设置最大等待时长。
Exchanger 实现原理
Exchanger 用于线程之间两两交换数据,在多线程下,互相交换数据的两个线程是不确定的。
- 在竞争比较小的时候,采用单槽位进行交换数据。当线程来交换数据时,发现槽位为空,则自己在这里等待,否则就和槽位进行交换数据,同时会唤醒等待的线程。
- 在竞争比较激烈的情况下,就会转到多槽位的交换。
- 当一个线程来交换的时候,如果”第一个”槽位是空的,那么自己就在那里等待
- 如果发现”第一个”槽位有等待线程,那么就直接交换,如果交换失败,说明其它线程在进行交换,那么就往后挪一个槽位,
如果有数据就交换,没数据就等一会,但是不会阻塞在这里。在这里等了一会,发现还没有其它线程来交换数据,那么就往“第一个”槽位的方向挪, - 如果反复这样过后,挪到了第一个槽位,没有线程来交换数据了,那么自己就在”第一个”槽位阻塞等待。
- 第一个槽位并不是指的数组中的第一个,而是逻辑第一个,因为存在伪共享,多槽位中,部分空间没有被利用。
参考:《Java并发编程的艺术》、博主「2.wa」的学习专栏笔记Java 线程交换数据-Exchanger_2.wa-CSDN博客