0
点赞
收藏
分享

微信扫一扫

基于redis的分布式锁


为什么要使用分布式锁?

因为服务器使用了集群方案。词穷。。。

怎么使用分布式锁?

需求

实现一个查询数据库,在大于0的情况下减库存这样小小的功能。

测试:模拟100并发并看结果

基础代码

没有任何锁


@RequestMapping("/reduce_stock")
public String reduceStock() {
//查数据库(redis)中库存数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
//判断库存
if (stock > 0) {
System.out.println("消费库存成功--->" + stock);
//更新库存
stock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
} else {
System.out.println("消费库存失败。。。");
}


return "helloworld";
}

用测压工具测压结果:出现并发问题

基于redis的分布式锁_数据库

有锁:给方法添加synchronized关键字

    @RequestMapping("/reduce_stock")
public synchronized String reduceStock() {
//查数据库(redis)中库存数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
//判断库存
if (stock > 0) {
System.out.println("消费库存成功--->" + stock);
//更新库存
stock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
} else {
System.out.println("消费库存失败。。。");
}
return "helloworld";
}

单机测试结果:没有问题

分布式测试结果:出现线程安全问题

分析,如下图所示:

两个微服务,synchronized关键字只能锁住一个微服务,跨微服务是锁不住的。

就像你家的屋子A复制一份为B,A是否锁门和B是否锁门是没有关系的。

基于redis的分布式锁_基于redis的分布式锁_02

基于redis的分布式锁(理论+实操)

理论

基于redis的分布式锁_基于redis的分布式锁_03

基于redis的setnx命令实现分布式锁

setnx命令的特点是:当你第一次设置的时候会返回1,后面在设置的时候就会返回0(即修改失败),如下图所示

基于redis的分布式锁_redis_04

手写基于redis分布式锁(此处逻辑、理论大于实操)

一代代码

分析

---逻辑:先获取锁,如果获取锁,就继续;否则就不执行

---问题:容易出现死锁。如果我获取锁成功后在执行业务逻辑的过程中出现异常,则释放锁的过程就没有了,不释放锁就会引起死锁

   @RequestMapping("/reduce_stock")
public String reduceStock() {

//key的名称
String lockKey = "lock";
//setnx key value 加锁逻辑
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
if (!aBoolean){
return "fail";
}

//查数据库(redis)中库存数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
//判断库存
if (stock > 0) {
System.out.println("消费库存成功--->" + stock);
//更新库存
stock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
} else {
System.out.println("消费库存失败。。。");
}

// del key 释放锁逻辑
stringRedisTemplate.delete(lockKey);

return "helloworld";
}

二代代码

分析:

---优点:在finally中释放锁,解决了死锁的问题

---问题:引起锁失效问题。看下面的代码,先加锁,如果加锁失败,返回,但是此时代码也会去执行finally中释放锁的功能,从而使别人加的锁失效。

    @RequestMapping("/reduce_stock")
public String reduceStock() {

//key的名称
String lockKey = "lock";

try {
//setnx key value 加锁逻辑
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
if (!aBoolean) {
return "fail";
}

//查数据库(redis)中库存数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
//判断库存
if (stock > 0) {
System.out.println("消费库存成功--->" + stock);
//更新库存
stock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
} else {
System.out.println("消费库存失败。。。");
}
} catch (Exception e) {
e.printStackTrace();
} finally {

// del key 释放锁逻辑
stringRedisTemplate.delete(lockKey);
}


return "helloworld";
}

 三代代码

分析:

--优点:解决了锁失效问题

--问题:没有解决因为宕机而引起的死锁,如下图所示,微服务8082获取锁后在执行业务逻辑时系统宕机后就会引起死锁

基于redis的分布式锁_分布式锁_05

    @RequestMapping("/reduce_stock")
public String reduceStock() {

//key的名称
String lockKey = "lock";


//setnx key value 加锁逻辑
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1");
if (!aBoolean) {
return "fail";
}

try {


//查数据库(redis)中库存数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
//判断库存
if (stock > 0) {
System.out.println("消费库存成功--->" + stock);
//更新库存
stock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
} else {
System.out.println("消费库存失败。。。");
}
} catch (Exception e) {
e.printStackTrace();
} finally {

// del key 释放锁逻辑
stringRedisTemplate.delete(lockKey);
}


return "helloworld";
}

四代代码

分析:

--优点:加锁逻辑时设置过期时间,可以解决三代代码的死锁问题,系统中断了我到时间就自动释放锁

--问题:我设置的时间是30秒(随机想的一个数),假设我的业务逻辑是35,那会引起锁失效。如下图所示

基于redis的分布式锁_分布式锁_06

@RequestMapping("/reduce_stock")
public String reduceStock() {

//key的名称
String lockKey = "lock";


//setnx key value 加锁逻辑
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, "1",30, TimeUnit.SECONDS);
if (!aBoolean) {
return "fail";
}

try {


//查数据库(redis)中库存数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
//判断库存
if (stock > 0) {
System.out.println("消费库存成功--->" + stock);
//更新库存
stock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
} else {
System.out.println("消费库存失败。。。");
}
} catch (Exception e) {
e.printStackTrace();
} finally {

// del key 释放锁逻辑
stringRedisTemplate.delete(lockKey);
}


return "helloworld";
}

 五代代码

分析:

--优点:解决了四代代码的锁失效问题

--缺点:如下图所示,如果我设置失效时间是30,而我业务逻辑时间是35,在30-35之间是有两个线程同时访问,这与独占锁是矛盾的,所以此处存在问题。

基于redis的分布式锁_基于redis的分布式锁_07

@RequestMapping("/reduce_stock")
public String reduceStock() {

//key的名称
String lockKey = "lock";

//value的值
String clientId = UUID.randomUUID().toString();


//setnx key value 加锁逻辑
Boolean aBoolean = stringRedisTemplate.opsForValue().setIfAbsent(lockKey, clientId, 30, TimeUnit.SECONDS);
if (!aBoolean) {
return "fail";
}

try {


//查数据库(redis)中库存数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
//判断库存
if (stock > 0) {
System.out.println("消费库存成功--->" + stock);
//更新库存
stock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
} else {
System.out.println("消费库存失败。。。");
}
} catch (Exception e) {
e.printStackTrace();
} finally {

//只能释放自己加的锁
if (clientId.equals(stringRedisTemplate.opsForValue().get("lock"))) {
// del key 释放锁逻辑
stringRedisTemplate.delete(lockKey);
}
}


return "helloworld";
}

瓶颈

我们现在的瓶颈就是超时时间的设置。

如果设置短了会出现五代代码的问题;如果设置长了,你不能保证业务逻辑一定会比你设置的时间短,就算你设置的时间长,10分钟,那万一系统中断10分钟内不能有业务处理,也是不可取的。

如果我们能动态修改这个超时时间,那就无敌了 

其实还有一个问题,这短代码的逻辑是获取锁失败后直接返回,其实应该继续尝试获取

基于redisson的分布式锁

原理

基于redis的分布式锁_redis_08

实践

<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.10.0</version>
</dependency>

@Bean
public Redisson redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://127.10.30.146:6379").setDatabase(0).setPassword("123456");
return (Redisson) Redisson.create(config);
}

@RestController
public class DistributedLockController {

@Autowired
private StringRedisTemplate stringRedisTemplate;

@Autowired
private Redisson redisson;


@RequestMapping("/reduce_stock")
public String reduceStock() {

//key的名称
String lockKey = "lock";

RLock lock = redisson.getLock(lockKey);

lock.lock();
try {
//查数据库(redis)中库存数量
Integer stock = Integer.parseInt(stringRedisTemplate.opsForValue().get("stock"));
//判断库存
if (stock > 0) {
System.out.println("消费库存成功--->" + stock);
//更新库存
stock = stock - 1;
stringRedisTemplate.opsForValue().set("stock", String.valueOf(stock));
} else {
System.out.println("消费库存失败。。。");
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}


return "helloworld";
}


}

问题

向redis集群写数据的步骤是: 

1)向master节点写数据

2) master节点返回

3)master节点同步到子节点

如果 线程t1 获取锁,写入一个数据    1)  2)成功后 此时master 节点掉线了, 在子节点中选一个master,但是这个master是没有t1写的数据,此时此刻t2是可以获取到锁的,这个是redis做分布式锁的瑕疵。

redis是高性能分布式锁,zk是高可靠分布式锁,看你看重性能还是一致性了。

压测工具

Apache JMeter  ​​Apache JMeter - Apache JMeter™​​

安装  ​​jmeter(一)安装与环境配置 - 远景style -​​

使用 ​​Jmeter压力测试工具安装及使用教程 - roundlight -​​

参考

​​https://ke.qq.com/course/455755?term_id=100545359&taid=4035731660403787​​

举报

相关推荐

0 条评论