0
点赞
收藏
分享

微信扫一扫

Java-分布式锁

烟中雯城 2023-04-25 阅读 122

分布式锁全家桶

线程安全两种方式:ReentrantLock、synchronized

mysql解决并发性问题

1、jvm本地锁(ReentrantLock、synchronized) 600

三种情况导致锁失效
  1. 多例模式(解决方式:service业务使用单例)
  2. 事务 (解决方式:可以使用提交读,READ_UNCOMMITTED)
  3. 集群部署(解决方式:使用增删改的时候是自动加锁的,就是使用sql语句)

2、一个sql语句:更新数量时判断 2000(表级锁)

解决:三种情况锁失效的问题

问题:
  1. 锁的范围问题(表级锁和行级锁)where中的过滤条件列,如果用索引,锁行,无法用索引,锁表。(参考悲观锁详解)
  2. 同一个商品多条记录
  3. 无法记录库存前后的状态

3、悲观锁:select……for update 600

mysql悲观锁种使用行级锁条件
  1. 锁的查询或者更新条件必须是索引字段
  2. 查询或更新条件必须是精确具体值(不然就变成表级锁了)
存在问题
  1. 性能问题
  2. 死锁问题,对多条数据加锁,操作的对象加锁顺序要一致。
  3. 库存操作要统一:select ...for update 与普通的select...一个不加锁。

4、乐观锁:时间戳version版本号 CAS机制

问题
  1. 高并发情况下,性能极低
  2. ABA问题
  3. 读写分离情况下导致乐观锁不可靠,因为要经过多从IO,存在网络延时。

MySQL锁总结

性能:一个sql>悲观锁>jvm锁>乐观锁

如果追求极致性能,业务场景简单并且不需要记录数据前后变化的情况下。 优先选择:一个sql。

如果写并发量较低(多读),争抢不是很激烈的情况下。优先选择:乐观锁

如果写并发量较高,一般会经常冲突,此时选择乐观锁的话,会导致业务代码不间断的重试。优先选择:mysql悲观锁

不推荐jvm本地锁

redis乐观锁

1、JVM本地锁机制(集群不可用)

2、redis乐观锁

watch:可以监控一个或多个key的值,如果在事务(exec)执行之前,key的值发生改变则取消事务执行

multi:开启一个事务;

exec:执行一个事务;

问题:性能极低 400 不推荐

3、分布式锁 跨进程 跨服务 跨服务器

场景:超卖现象(NoSQL)、缓存击穿

分布式锁的实现方法

1、基于redis实现

特征:

1、独占排他使用 setnx

2、防死锁的发生

如果redis客户端程序从redis服务中获取到锁之后立马宕机,产生死锁。

解决:给锁添加过期时间。

不可重入:保证可重入性解决死锁(A方法中调B方法俩个方法是同一个锁,产生死锁)

3、原子性

实现获取锁和过期时间之间原子性:set key value ex 3 nx

实现判断和释放锁之间原子性:lua脚本

4、防误删:一个请求删除另外一个锁的请求(提供uuid)

先判断,在删除

5、保证可重入性 hash+lua脚本实现可重入性

6、自动续期 :判断是否是自己的锁,如果发现是自己的锁重置过期时间

7、在集群情况下,导致机制失效:

  • 客户端程序10010,从主中获取锁
  • 还没来得急同步数据,主挂了
  • 于是从升级为主
  • 客户端程序10086就从新主中获取到锁,导致锁机制失效
RedLock算法

1、应用程序获取系统当前时间

2、应用程序使用相同的kv值依次从多个redis实例中获取锁,如果某一个节点超过一定时间依旧没有获取锁则直接放弃,尽快尝试从下一个健康的redis节点获取锁,以避免被一个宕机了的节点阻塞。

3、计算获取锁的消耗时间=客户端程序的系统当前时间—step1中的时间。获取锁的消耗时间小于总的锁定时间(30s),并且半数以上的节点获取锁成功,则认为获取锁成功

4、剩余的锁定时间=总的锁定时间—获取锁的消耗时间

5、如果获取锁失败了,针对所有的节点释放锁。

ReentrantLock底层原理

可重入加锁加锁流程
ReentrantLock.lock() -> NonfairSync.lock()->AQS.acquire(1)->NonfairSync.tryAcquire(1)->
Sync.nonfairTryAcquire(1)
1、CAS获取锁,如果没有线程占用锁(state == 0) state设置为1,加锁成功并记录当前线程是有锁线程(两次)
2、如果state的值不为0,说明锁已经被占用,则判断当前线程是否是有锁线程,如果是则重入(state+1)
3、加锁失败,入队等待
可重入加锁解锁流程
ReentrantLock.unlock()->AQS.release()->Sync.tryRelease(1)
1、判断当前线程是否是有锁线程,不是则抛出异常
2、对state的值减1后判断state的值是否为0,为0则解锁成功,返回true
3、如果减1后的值不为0,则返回false
参照ReentrantLock中的非公平可重入锁实现分布式可重入锁:Hash+Lua脚本

参考ReetrantLock中的非公平可重入锁实现redis分布式可重入锁:hash+lua脚本

加锁

1、判断锁是否存在(exists),则直接获取锁hset/hexists key field value

2、如果锁存在则判断是否是自己的锁(hexists),如果是自己的锁则重入,递增加一(hincrby key field increment)

3、如果锁存在不是自己的锁,否则重试:递归,循环

if redis.call('exists','lock') == 0
then 
	redis.call('hset','lock',uuid,1)
	redis.call('expire','lock',30)
	return 1
elseif redis.call('hexists','lock',uuid) == 1
then
	redis.call('hincrby','lock',uuid,1)
	redis.call('expire','lock',30)
	return 1
else
	return 0
end
写法2
if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1
then 
	redis.call('hincrby',KEYS[1],ARGV[1],1)
	redis.call('expire',KEYS[1],ARGV[2])
	return 1
else
	return 0
end

keys(静态):lock,
argv(动态):uuid,30

if redis.call('exists',KEYS[1]) == 0 or redis.call('hexists',KEYS[1],ARGV[1]) == 1 then redis.call('hincrby',KEYS[1],ARGV[1],1) redis.call('expire',KEYS[1],ARGV[2]) return 1 else return 0 end

解锁

1、判断自己的锁是否存在,不存在则返回nil

2、如果自己的锁存在,则减1(hincrby -1),判断-1后的值是否为0,为0则释放锁(del)返回1

3、不为0则返回0

if redis.call('hexists',KEYS[1],ARGV[1]) == 0
then 
	return nil;
elseif redis.call('hincrby',KEYS[1],ARGV[1],-1) == 0
then
	return redis.call('del',KEYS[1])
else
	return 0
end

if redis.call('hexists',KEYS[1],ARGV[1]) == 0 then return nil elseif redis.call('hincyby',KEYS[1],ARGV[1],-1) == 0 then return redis.call('del',KEYS[1]) else return 0 end

自动续期

定时任务(时间驱动)+lua脚本+

lua脚本

一次性发送多个指令给redis,redis单线程执行遵守one-by-one规则


script:lua脚本字符串

numkeys:key列表的元素数量

key列表:以空格分割 从1开始

arg列表:以空格分割 从1开始

变量:

全局变量:a = 1

局部变量:local a = 2;

分支控制

if 条件

then

代码块

else

代码块

end

if redis.call('get','lock') == uuid
then 
	return redis.call('del','lock')
else
	return 0
end

操作
  • 加锁,setnx
  • 重试,递归
  • 解锁,del
Redisson锁
概述

Redisson是一个在Redis的基础上实现的Java驻内存数据网格(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离,从而让使用者能够将精力更集中地放在处理业务逻辑上。

Redisson和jedis以及lettuce一样都是redis客户端,只不过Redisson功能更强大。

官方文档:https://github.com/redisson/redisson/wiki/目录

环境搭建

redisson相关依赖

<!-- 以后使用redisson作为所有分布式锁,分布式对象等功能框架-->
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.12.0</version>
</dependency>

配置redisson,程序化的配置方法是通过构建Config对象实例实现

package com.example.distribyted.lock.config;

import org.redisson.Redisson;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RedissonConfig {
    @Bean
    public RedissonClient redissonClient(){
        Config config = new Config();

        config
                .useSingleServer()//设置redis模式(单机、集群)
                .setAddress("redis://127.0.0.1:6379");//redis服务器地址
//                .setDatabase(0)//设置redis数据库编号
//                .setUsername().setPassword()设置用户名和密码
//                .setConnectionMinimumIdleSize(10);//连接池最小空闲线程数
//                .setConnectionPoolSize(50);//连接池最大线程数数
//                .setIdleConnectionTimeout(60000);//线程超时时间
//                .setConnectTimeout();//客户端程序获取redis连接的超时时间
//                .setTimeout();//响应超时时间
        RedissonClient redissonClient = Redisson.create();
        return redissonClient;
    }
}

可重入锁

@ResponseBody
@GetMapping("/hello")
public String hello(){
    //获取一把锁
    RLock lock = redissonClient.getLock("my-lock");

    //加锁
    lock.lock();
    //锁的自动续期,如果业务执行时间超长,运行期间会自动给锁续期30秒时间,不用担心业务时间长,锁自动过期
    //加锁的业务只要运行完成,就不会给当前锁续期,即使不手动解锁,锁默认在30秒后也会自动删除
    try {
        System.out.println("加锁成功,执行业务.... "+Thread.currentThread().getId());
        Thread.sleep(30000);
    } catch (InterruptedException e) {
        e.printStackTrace();
    } finally {
        //手动解锁
        System.out.println("解锁..."+Thread.currentThread().getId());
        lock.unlock();
    }
    return "hello";
}

  • lock.lock()既没有指定锁的过期时间就是默认是30s,只要设置锁成功,就会启动一个定时任务,每隔10秒就会自动重新续期到30s。
  • lock.lock(10,TimeUnit.SECONDS),默认锁的过期时间就是我们指定的时间。
公平锁(队列先进先出)

基于Redis的Redisson分布式可重入公平锁也是实现了java.util.concurrent.locks.Lock接口的一种RLock对象。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。它保证了当多个Redisson客户端线程同时请求加锁时,优先分配给先发出请求的线程。所有请求线程会在一个队列中排队,当某个线程出现宕机时,Redisson会等待5秒后继续下一个线程,也就是说如果前面有5个线程都处于等待状态,那么后面的线程会等待至少25秒。

RLock fairLock = redisson.getFairLock("anyLock");
// 最常见的使用方法
fairLock.lock();

读写锁

基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。

分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。

读锁

@GetMapping("/read")
public String readValue(){
    RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock");
    String s = "";
    //加读锁
    RLock rLock = lock.readLock();
    rLock.lock();
    try {
        System.out.println("读锁加锁成功"+Thread.currentThread().getId());
        s = redisTemplate.opsForValue().get("writeValue");
        Thread.sleep(30000);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
        System.out.println("读锁释放"+Thread.currentThread().getId());
    }
    return  s;
}

写锁

@GetMapping("/write")
public String writeValue(){
    // 获取一把锁
    RReadWriteLock lock = redissonClient.getReadWriteLock("rw-lock");
    String s = "";
    // 加写锁
    RLock rLock = lock.writeLock();
    try {
        //1、改数据加写锁,读数据加读锁
        rLock.lock();
        System.out.println("写锁加锁成功..."+Thread.currentThread().getId());
        s = UUID.randomUUID().toString();
        Thread.sleep(30000);
        redisTemplate.opsForValue().set("writeValue",s);
    } catch (Exception e) {
        e.printStackTrace();
    } finally {
        rLock.unlock();
        System.out.println("写锁释放"+Thread.currentThread().getId());
    }
    return  s;
}

测试

  • 先加写锁,后加读锁,此时并不会立刻给数据加读锁,而是需要等待写锁释放后,才能加读锁
  • 先加读锁,再加写锁:有读锁,写锁需要等待
  • 先加读锁,再加读锁:并发读锁相当于无锁模式,会同时加锁成功
信号量

基于Redis的Redisson的分布式信号量(Semaphore)Java对象RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。同时还提供了异步(Async)、反射式(Reactive)和RxJava2标准的接口。

RSemaphore semaphore = redisson.getSemaphore("semaphore");
semaphore.acquire();
//或
semaphore.acquireAsync();
semaphore.acquire(23);
semaphore.tryAcquire();
//或
semaphore.tryAcquireAsync();
semaphore.tryAcquire(23, TimeUnit.SECONDS);
//或
semaphore.tryAcquireAsync(23, TimeUnit.SECONDS);
semaphore.release(10);
semaphore.release();
//或
semaphore.releaseAsync();

Redisson小结

redisson:redis的Java客户端,分布式锁

玩法

2、基于zookeeper/etcd实现

3、基于mysql实现(唯一键索引)

思路:

加锁:Insert into tb_lock(lock_name) values('lock'),执行成功代表获取锁成功
释放锁:执行成功的请求执行业务操作,执行完成之后通过delete删除对应记录
重试:递归
1、独占排他互斥,唯一键索引
2、防死锁:
	客户端程序获取锁之后,客户端程序的服务器宕机。给锁一个获取锁的时间列,定时任务发现进行解锁(服务的定时器来检查)
	不可重人:可重入,可参考redis分布式锁的hash+lua脚本,先记录服务信息以及线程信息 重入次数来实现
3、防误删:借助于id的唯一性来防止
4、原子性:新增删除自带锁,还可以借助于MySQL的悲观锁
5、可重复入:
6、自动续期 服务器内的定时器来重置锁的系统的时间
7、单机故障,搭建mysql的主备

总结

1、简易程序:mysql>redis>zookeeper

2、性能

redis>zookeeper>mysql

3、可靠性

zk>redis=mysql

举报

相关推荐

0 条评论