需要锁的原因:
- 多任务去抢一个资源
- 多任务都需要对资源进行写操作
- 多任务对资源的访问是互斥的
1、不使用锁在多线程的情况下会出现什么情况
定义一个生成订单ID类,也就是多任务去争抢的资源
public class OrderNumGenerator {
//全局订单id
public static int count = 0;
//生成订单ID
public String getNumber() {
SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
return simpt.format(new Date()) + "-" + ++count;
}
}
使用并发工具栏CountDownLatch,启动500个线程去调用资源
public class OrderService implements Runnable {
private OrderNumGenerator orderNumGenerator = new OrderNumGenerator();
//发令枪,模拟500个并发
private static CountDownLatch countDownLatch = new CountDownLatch(500);
private static List<String> result = new Vector<String>();
public void run() {
try {
countDownLatch.await();
result.add(orderNumGenerator.getNumber());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) throws InterruptedException {
System.out.println("####生成唯一订单号###");
for (int i = 0; i < 500; i++) {
new Thread(new OrderService()).start();
countDownLatch.countDown();
}
countDownLatch.await();
Thread.sleep(1000);
Collections.sort(result);
for(String str:result) {
System.out.println(str);
}
}
}
结果发现,出现重复的订单ID
如果不是在分布式的环境下,我们通常采用synchronized或者Lock的方式加锁,来解决这个问题
2、synchronized
直接在资源,也就是生成订单ID的代码中加上一把锁
public class OrderNumGenerator {
//全局订单id
public static int count = 0;
public static Object lock = new Object();
public String getNumber() {
synchronized(lock){
SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
return simpt.format(new Date()) + "-" + ++count/*+"_"+Thread.currentThread().getId()*/;
}
}
}
3、Lock方式
同样在资源上加锁,即可解决问题
public class OrderNumGenerator {
//全局订单id
public static int count = 0;
private java.util.concurrent.locks.Lock lock = new ReentrantLock();
//以lock的方式解决
public String getNumber() {
try {
lock.lock();
SimpleDateFormat simpt = new SimpleDateFormat("yyyy-MM-dd-HH-mm-ss");
String s = simpt.format(new Date()) + "-" + ++count;
return s;
}finally {
lock.unlock();
}
}
}
但是上述两种方式,在分布式环境中解决,在了解分布式锁之前, 先了解下模板方法
即:在父类中定义主流程,主流程中某些方法可以延迟到子类去实现
4、模板方法
父类
public abstract class FatherTemplate {
public void A() {
System.out.println("A");
}
public abstract void B() ;
public void C() {
System.out.println("C");
}
public void D() {
A();
B();
C();
}
}
子类
public class SonTemplate extends FatherTemplate{
@Override
public void B() {
System.out.println("B");
return;
}
public static void main(String[] args) {
FatherTemplate sonTemplate = new SonTemplate();
sonTemplate.D();
}
}
这样不同的子类,都可以定义自己的B方法,在实际的调用过程中,调用自己的B方法
5、zookeeper实现分布式锁
定义一个接口
//接口
public interface Lock {
//获取到锁的资源
public void getLock();
// 释放锁
public void unLock();
}
定义一个模板
public abstract class AbstractLock implements Lock{
public void getLock() {
if (tryLock()) {
System.out.println("##获取锁####");
} else {
waitLock();
getLock();
}
}
public abstract boolean tryLock();
public abstract void waitLock();
}
接下来使用zookeeper实现tryLock()、waitLock()、unLock()
tryLock():使用zookeeper创建一个临时节点,如果成功,代表获取到锁
waitLock():使用zookeeper注册一个该临时节点的监听,并且使用到并发工具类,如果该节点已存在,则等待;当监听到它被删除时,继续执行
unLock():删除节点
public class ZookeeperDistrbuteLock extends ZookeeperAbstractLock {
private CountDownLatch countDownLatch = null;
@Override
//尝试获得锁
public boolean tryLock() {
try {
zkClient.createEphemeral(PATH);
return true;
} catch (Exception e) {
//如果创建失败报出异常
return false;
}
}
@Override
public void waitLock() {
IZkDataListener izkDataListener = new IZkDataListener() {
public void handleDataDeleted(String path) throws Exception {
// 唤醒被等待的线程
if (countDownLatch != null) {
countDownLatch.countDown();
}
}
public void handleDataChange(String path, Object data) throws Exception {
}
};
// 注册事件
zkClient.subscribeDataChanges(PATH, izkDataListener);
//如果节点存
if (zkClient.exists(PATH)) {
countDownLatch = new CountDownLatch(1);
try {
//等待,一直等到接受到事件通知
countDownLatch.await();
} catch (Exception e) {
e.printStackTrace();
}
}
// 删除监听
zkClient.unsubscribeDataChanges(PATH, izkDataListener);
}
public void unLock() {
//释放锁
if (zkClient != null) {
zkClient.delete(PATH);
zkClient.close();
System.out.println("释放锁资源...");
}
}
}
这样就可以使用我们自己实现的zookeeper分布式锁
private Lock lock = new ZookeeperDistrbuteLock();
public void getNumber() {
try {
lock.getLock();
//业务操作
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unLock();
}
}
6、zookeeper分布式锁优化
上述方法存在羊群效应,即一个线程拿到锁,其他线程都在等待,当释放锁的时候,所有的其他线程都会去抢
tryLock():使用zookeeper创建一个临时有序节点,并且当前节点在所有的临时有序节点中排第一,才是获取到锁;如果不是第一,则查找它前面节点
waitLock():使用zookeeper注册一个前面节点的监听,当监听到前面节点被删除时,则继续执行
unLock():删除节点
这样根据多任务的请求顺序,创建多个锁,每一个都监听前一个锁是否被删除,如果监听到删除则继续执行,去尝试拿锁。就不会出现羊群效应了
public class ZookeeperDistrbuteLock2 extends ZookeeperAbstractLock {
private CountDownLatch countDownLatch= null;
private String beforePath;//当前请求的节点前一个节点
private String currentPath;//当前请求的节点
public ZookeeperDistrbuteLock2() {
if (!this.zkClient.exists(PATH2)) {
this.zkClient.createPersistent(PATH2);
}
}
@Override
public boolean tryLock() {
//如果currentPath为空则为第一次尝试加锁,第一次加锁赋值currentPath
if(currentPath == null || currentPath.length()<= 0){
//创建一个临时顺序节点
currentPath = this.zkClient.createEphemeralSequential(PATH2 + '/',"lock");
}
//获取所有临时节点并排序,临时节点名称为自增长的字符串如:0000000400
List<String> childrens = this.zkClient.getChildren(PATH2);
Collections.sort(childrens);
if (currentPath.equals(PATH2 + '/'+childrens.get(0))) {//如果当前节点在所有节点中排名第一则获取锁成功
return true;
} else {//如果当前节点在所有节点中排名中不是排名第一,则获取前面的节点名称,并赋值给beforePath
int wz = Collections.binarySearch(childrens,
currentPath.substring(7));
beforePath = PATH2 + '/'+childrens.get(wz-1);
}
return false;
}
@Override
public void waitLock() {
IZkDataListener listener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
if(countDownLatch!=null){
countDownLatch.countDown();
}
}
public void handleDataChange(String dataPath, Object data) throws Exception {
}
};
//给排在前面的的节点增加数据删除的watcher,本质是启动另外一个线程去监听前置节点
this.zkClient.subscribeDataChanges(beforePath, listener);
if(this.zkClient.exists(beforePath)){
countDownLatch=new CountDownLatch(1);
try {
countDownLatch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.zkClient.unsubscribeDataChanges(beforePath, listener);
}
public void unLock() {
//删除当前临时节点
zkClient.delete(currentPath);
zkClient.close();
}
}