一、直接开始谈Lock
【简单谈一下 Lock使用,实现类源码和原理,点到为止(在说完AQS时可以回过头谈一谈 ReentrantReadWriteLock 原理或者自己设计一个锁或者AQS怎么做),抛出JDK8中新的优化类,证明咱技术还可以,快速过,暂时重点放在后面 AQS 上面】
Lock是JDK提供给我们的显示锁,一般我们在try块前使用 lock() 方法获取锁,finally 块中调用unlock() 释放锁,通常除了获取释放,我们还使用 lockInterruptibly() 可中断地获取锁,和lock()方法的不同之处在于该方法会响应中断,即在锁的获取中可以中断当前线程;还有 tryLock() 尝试非阻塞的获取锁,调用该方法后立刻返回,如果能够获取则返回true,否则返回false;
lock接口中 ReentrantLock 是一个重要的而且我经常用的一个实现类,ReentrantLock在调用lock()方法时,已经获取到锁的线程,能够再次调用lock()方法获取锁而不被阻塞。ReentrantLock提供了一个构造函数,能够控制锁是否是公平的。
/**
* Creates an instance of {@code ReentrantLock} with the given fairness policy.
* @param fair {@code true} if this lock should use a fair ordering policy
*/
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
除此之外还有 ReentrantReadWriteLock 读写锁,它是实现的是ReadWriteLock接口,一般读多写少的时候会使用它,不过需要注意线程饥饿问题。
【亮点】Java8中的新提供了一个 StampLock,它是读写锁的一个改进版本,第一,它提供了一种乐观的读策略,这种乐观策略的锁非常类似于无锁的操作,使得乐观锁完全不会阻塞写线程;第二优化了读不阻塞写:就是说在读的时候如果发生了写,则应当重读而不是在读的时候直接阻塞写,但是写和写之间还是阻塞的。
public class RwLock {
private int Integer;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
private final Lock getLock = lock.readLock();//读锁
private final Lock setLock = lock.writeLock();//写锁
public Integer getNum() {
getLock.lock();
try {
return this.Integer; // 只读
} finally {
getLock.unlock();
}
}
public void setNum(int number) {
setLock.lock();
try {
Integer++; // 写入
} finally {
setLock.unlock();
}
}
}
二、开始谈AQS
【先谈AQS的关注点表示你自己知道AQS在Lock体系总的作用,再谈使用、设计模式、数据结构、源码实现等等扩展点,最后可以】
AQS全称AbstractQueuedSynchronizer,意思就是抽象队列同步器,在Lock实现类中大量使用,那我们关注点AQS是如何实现,当我多个线程要等待一把锁的时候,JDK是如何安排这些线程,在AQS中进行等待;当锁被释放的时候,又如何进行唤醒的。
【总】针对关注点,了解到,AQS是用来构建锁或者其他同步组件的基础框架,它使用了一个int 成员变量 state 表示同步状态,通过内置的FIFO队列来完成资源获取线程的排队工作。
【分】AQS的主要使用方式是继承,子类通过继承 AQS 并实现它的抽象方法来管理同步状态, AQS为保证线程安全,提供的3个方法 getState()、setState(int newState) 和 compareAndSetState(int expect,int update);AQS自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法,来供自定义同步组件使用,AQS既可以支持独占式、共享式地获取同步状态,例如 ReentrantLock、 ReentrantReadWriteLock 和 CountDownLatch 等都在内部使用AQS管理线程。实现者需要继承 AQS 并重写指定的方法,随后将AQS组合在自定义同步组件的实现中,并调用AQS提供的 模板方法,而这些模板方法将会调用使用者重写的方法。
AQS内部方法主要分为,模板方法、可重写的方法和访问或修改同步状态的方法三类,模板方法主要有 acquire(int arg) 方法独占式获取同步状态,release(int arg) 方法独占式的释放同步状态;可重写方法有 tryAcquire(int arg) 独占式获取同步状态,tryRelease() 方法等;访问或修改同步状态的方法就上面说的三种,有 getState() 方法获取当前同步状态。 setState(int newState) 方法设置当前同步状态。和 compareAndSetState(int expect,int update) 方法使用CAS设置当前状态。
AQS中是通过内部类 Node 来维护一个CLH队列 的,CLH队列锁也是一种基于链表的可扩展、高性能、公平的自旋锁,申请线程仅仅在本地变量上自旋,它不断轮询前驱的状态,假设发现前驱释放了锁就结束自旋;内部节点Node主要 head 和 tail 一个指向队列头节点,而另一个指向队列尾节点;Condition等待队列 ,一个Condition包含一个等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)。Condition有两个重要方法 await() 和 signal() , await() 方法(或者以await开头的方法),会使当前线程进入等待队列并释放锁, 同时线程状态变为等待状态。将会以当前线程构造节点,并将节点从尾部加入等待队列。当从await()方法返回时,当前线程一定获取了Condition相关联的锁。调用Condition的 signal() 方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。
三、在谈Lock原理
【锁的可重入 原理 ,读写锁原理】
ReentrantLock 锁的可重入 ,是 nonfairTryAcquire() 方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回true,表示获取同步状态成功。同步状态表示锁被一个线程重复获取的次数。如果该锁被获取了n次,那么前(n-1)次tryRelease(int releases)方法必须返回false
ReentrantReadWriteLock 的实现,是读写锁将变量切分成了两个部分,高16位表示读,低16位表示写,假设当前同步状态值为S,写状态等于S&0x0000FFFF(将高16位全部抹去),读状态等于S>>>16(无符号补0右移16位)。当写状态增加1时,等于S+1,当读状态增加1时,等于S+(1<<16),也就是S+0x00010000。根据状态的划分能得出一个推论:S不等于0时,当写状态(S&0x0000FFFF)等于0时,则读状态(S>>>16)大于0,即读锁已被获取。写锁一旦被获取,则其他读写线程的后续访问均被阻塞。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,依靠CAS保证)增加读状态。
四、本文扩展
自己实现一个 AQS、实现一个自己的 独占锁(基于AQS实现锁)、实现一个 可重入锁
【自己实现一个AQS】
应该关心些什么信息?
1、线程信息,肯定要知道我是哪个线程;
2、队列中线程状态,既然知道是哪一个线程,肯定还要知道线程当前处在什么状态,是已经取消了“获锁”请求,还是在“”等待中”,或者说“即将得到锁”
3、前驱和后继线程,因为是一个等待队列,那么也就需要知道当前线程前面的是哪个线程,当前线程后面的是哪个线程(因为当前线程释放锁以后,理当立马通知后继线程去获取锁)。
import tool.SleepTools;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 实现一个自己的AQS
* 思路:1.定义一个带锁状态的Node
* 2.定义一个FIFO阻塞队列,用以存放阻塞的Node
* 3.释放锁时,设置锁状态为false,并删除结点
*
* @author Cai
* @date 2021/8/15-19:49
*/
public class CLHLock implements Lock {
// 原子读写的对象引用变量,FIFO阻塞队列
AtomicReference<Node> tail;
// 线程的前驱结点
ThreadLocal<Node> myPred;
// 线程的当前结点
ThreadLocal<Node> myNode;
public CLHLock() {
// 创建的默认队尾结点,默认是释放锁的状态(locked为false)
tail = new AtomicReference<Node>(new Node(false));
myPred = ThreadLocal.withInitial(() -> null);
myNode = ThreadLocal.withInitial(() -> new Node(false));
}
public void lock() {
System.out.println(Thread.currentThread().getName() + " ready get lock");
// 给当前线程 创建一个节点Node 例如:1,2同时进来 Node1,Node2
Node node = myNode.get();
// 当前线程需要获得锁,给当前线程设置状态为true 例如:1,2都为true
node.locked = Boolean.TRUE;
// 获取取到前趋节点Node pred,然后设置当前线程的节点到队尾 例如:1先到到,tail(false)->1(true)->2(true)
Node pred = tail.getAndSet(node);
// 设置当前线程的 前趋节点Node pred 例如:1的前驱就是tail(false),tail(false)->1(true)
myPred.set(pred);
// 一直阻塞,直到前趋节点pred释放锁(pred.locked为false时可以获取锁)
while (pred.locked) {
}
System.out.println(Thread.currentThread().getName() + "---already got lock---");
}
public void unlock() {
System.out.println(Thread.currentThread().getName() + " ready release lock");
// 获取当前持有锁的 线程的 Node节点
Node node = myNode.get();
// 设置当前线程释放锁的状态为 false
node.locked = Boolean.FALSE;
// 解除myNode绑定在当前线程上,即使下次当前线程再次需要获得锁时则再创建一个Node。
myNode.remove();
myPred.remove();
System.out.println(Thread.currentThread().getName() + "---already released lock---");
}
/**
* 模拟AQS
*/
private static class Node {
volatile boolean locked; // 锁状态
Node(boolean locked) {this.locked = locked;}
}
public static void main(String[] args) {
// 创建实现的独占锁
final Lock lock = new CLHLock();
// 创建工作线程
class Worker extends Thread {
public void run() {
lock.lock();
System.out.println("当前持有锁的线程:" + Thread.currentThread().getName());
try {
SleepTools.second(1); // 休眠1s
} finally {
lock.unlock();
}
}
}
// 启动4个子线程
for (int i = 0; i < 4; i++) {
Worker w = new Worker();
w.start();
}
// 主线程每隔1秒换行
for (int i = 0; i < 10; i++) {
SleepTools.second(1);
}
}
}
【自己实现一个自己的独占锁(基于AQS实现锁)】
重写AQS经验总结
同步组件的实现依赖同步器,使用AQS的方式被推介定义 继承AQS的静态内部类
AQS采用模板方法设计,AQS的protected方法需要子类重写,当调用模板方法时就可以调用被重写方法
AQS负责同步状态的管理,线程的排队,等待唤醒等底层操作,而自定义同步组件主要专注于 实现同步语义
重写AQS时,使用AQS提供的getState().setState(),compareAndSetState()修改同步状态
import tool.SleepTools;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* 实现一个自己的独占锁
* 思路: 1.实现Lock接口
* 2.使用AQS实现继承自接口中的方法
*
* @author Cai
* @date 2021/8/15-18:56
*/
public class SelfLock implements Lock {
// 静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
/*判断处于占用状态*/
protected boolean isHeldExclusively() {
// 获取当前同步状态,
return getState() == 1;
}
/*获得锁*/
protected boolean tryAcquire(int arg) {
// 使用CAS设置当前状态,该方法能够保证状态设置的原子性
if (compareAndSetState(0, 1)) {
// 设置当前拥有独占访问权限的线程
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
/*释放锁*/
protected boolean tryRelease(int arg) {
if (getState() == 0) {
throw new IllegalMonitorStateException();
}
setExclusiveOwnerThread(null);
setState(0);
//compareAndSetState(1,0);
return true;
}
// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() {
return new ConditionObject();
}
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() {
// 竞争锁的队列,如果没竞争成功,会进入队列
System.out.println(Thread.currentThread().getName() + " ready get lock");
// 独占式获取同步状态,如果当前线程获取同步状态成功,则由该方法返回,否则,将会进入同步队列等待
sync.acquire(1);
System.out.println(Thread.currentThread().getName() + "---already got lock---");
}
public boolean tryLock() {
// 独占式获取同步状态,实现该方法需要查询当前状态并判断同步状态是否符合预期,然后再进行CAS设置同步状态
return sync.tryAcquire(1);
}
public void unlock() {
System.out.println(Thread.currentThread().getName() + " ready release lock");
// 独占式的释放同步状态,该方法会在释放同步状态之后,将同步队列中第一个节点包含的线程唤醒
sync.release(1);
System.out.println(Thread.currentThread().getName() + "---already released lock---");
}
/**
* 等待队列
* 一个Condition包含一个FIFO等待队列,Condition拥有首节点(firstWaiter)和尾节点(lastWaiter)
* 在队列中的每个节点都包含了一个线程引用,该线程就是在Condition对象上等待的线程,
* 节点的定义复用了AQS中节点的定义,也就是说,同步队列和等待队列中节点类型都是AQS的静态内部类。
*/
public Condition newCondition() {
return sync.newCondition();
}
public void lockInterruptibly() throws InterruptedException {
// 与acquire(int arg)相同.但是该方法响应中断,当前线程未获取到同步状态而进入同步队列中,如果当前线程被中断,则会抛出InterruptedException并返回
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
// 在acquircInterruptibly(int arg)基础上增加了超时限制,如果当前线程在超时时间内没有获取到同步状态,那么将会返回false.如果获取到了返回true
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public static void main(String[] args) {
// 创建实现的独占锁
final Lock lock = new SelfLock();
// 创建工作线程
class Worker extends Thread {
public void run() {
lock.lock();
System.out.println("当前持有锁的线程:"+Thread.currentThread().getName());
try {
SleepTools.second(1); // 休眠1s
} finally {
lock.unlock();
}
}
}
// 启动4个子线程
for (int i = 0; i < 4; i++) {
Worker w = new Worker();
w.start();
}
// 主线程每隔1秒换行
for (int i = 0; i < 10; i++) {
SleepTools.second(1);
}
}
}
输出效果:
Thread-0 ready get lock
Thread-3 ready get lock
Thread-0---already got lock---
Thread-2 ready get lock
Thread-1 ready get lock
当前持有锁的线程:Thread-0
Thread-0 ready release lock
Thread-3---already got lock---
当前持有锁的线程:Thread-3
Thread-0---already released lock---
Thread-3 ready release lock
Thread-3---already released lock---
Thread-2---already got lock---
当前持有锁的线程:Thread-2
Thread-2 ready release lock
Thread-2---already released lock---
Thread-1---already got lock---
当前持有锁的线程:Thread-1
Thread-1 ready release lock
Thread-1---already released lock---
【实现一个可重入锁】
import tool.SleepTools;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
/**
* @author Cai
* @date 2021/8/15-23:27
*/
public class ReenterSelfLock implements Lock {
// 静态内部类,自定义同步器
private static class Sync extends AbstractQueuedSynchronizer {
// 是否处于占用状态
protected boolean isHeldExclusively() {
return getState() > 0;
}
// 当状态为0的时候获取锁
public boolean tryAcquire(int acquires) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
} else if (getExclusiveOwnerThread() == Thread.currentThread()) {
setState(getState() + 1);
return true;
}
return false;
}
// 释放锁,将状态设置为0
protected boolean tryRelease(int releases) {
if (getExclusiveOwnerThread() != Thread.currentThread()) {
throw new IllegalMonitorStateException();
}
if (getState() == 0)
throw new IllegalMonitorStateException();
setState(getState() - 1);
if (getState() == 0) {
setExclusiveOwnerThread(null);
}
return true;
}
// 返回一个Condition,每个condition都包含了一个condition队列
Condition newCondition() {
return new ConditionObject();
}
}
// 仅需要将操作代理到Sync上即可
private final Sync sync = new Sync();
public void lock() {
System.out.println(Thread.currentThread().getName() + " ready get lock");
sync.acquire(1);
System.out.println(Thread.currentThread().getName() + " already got lock");
}
public boolean tryLock() {
return sync.tryAcquire(1);
}
public void unlock() {
System.out.println(Thread.currentThread().getName() + " ready release lock");
sync.release(1);
System.out.println(Thread.currentThread().getName() + " already released lock");
}
public Condition newCondition() {
return sync.newCondition();
}
public boolean isLocked() {
return sync.isHeldExclusively();
}
public boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public void lockInterruptibly() throws InterruptedException {
sync.acquireInterruptibly(1);
}
public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
return sync.tryAcquireNanos(1, unit.toNanos(timeout));
}
public static void main(String[] args) {
// 创建实现的独占锁
final Lock lock = new ReenterSelfLock();
// 创建工作线程
class Worker extends Thread {
public void run() {
lock.lock();
System.out.println("当前持有锁的线程:" + Thread.currentThread().getName());
try {
SleepTools.second(1); // 休眠1s
} finally {
lock.unlock();
}
}
}
// 启动4个子线程
for (int i = 0; i < 4; i++) {
Worker w = new Worker();
w.start();
}
// 主线程每隔1秒换行
for (int i = 0; i < 10; i++) {
SleepTools.second(1);
}
}
}