一、搭建测试工程项目
为了演示redis分布式锁的应用,我们需要搭建一个分布式微服务项目。架构如下:
1.1 Linux和redis的下载和安装
这里省略,可以参考这篇文章:
https://blog.csdn.net/oneby1314/article/details/113789412
1.2 搭建 SpringBoot 工程
- 新建 Module 或者 Maven 子工程
- 编写 pom.xml 管理工程依赖
- 编写 application.yml 配置文件(或者 application.properties 配置文件)
- 编写主启动类
- 编写配置类
- 编写业务类
- 代码测试
修改父工程 pom.xml 文件
由于两个子模块都继承了父工程的pom,所以子模块的pom不需要修改。
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>springboot-redis</artifactId>
<packaging>pom</packaging>
<version>1.0-SNAPSHOT</version>
<modules>
<module>redis1</module>
<module>redis2</module>
</modules>
<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-actuator -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-data-redis -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.commons/commons-pool2 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-pool2</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/redis.clients/jedis -->
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>3.1.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-aop -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/org.redisson/redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.13.4</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
新建 application.properties 配置文件
server.port=1111
spring.redis.database=0
spring.redis.host=
spring.redis.port=6379
#连接池最大连接数(使用负值表示没有限制)默认8
spring.redis.lettuce.pool.max-active=8
#连接池最大阻塞等待时间(使用负值表示没有限制)默认-1
spring.redis.lettuce.pool.max-wait=-1
#连接池中的最大空闲连接默认8
spring.redis.lettuce.pool.max-idle=8
#连接池中的最小空闲连接默认0
spring.redis.lettuce.pool.min-idle=0
新建 BootRedis01Application 主启动类
/**
* @ClassName BootRedis01Application
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:54
* @Version 1.0
*/
@SpringBootApplication(exclude = {DataSourceAutoConfiguration.class})
public class BootRedis01Application {
public static void main(String[] args) {
SpringApplication.run(BootRedis01Application.class);
}
}
新建 RedisConfig 配置类,用于获取 RedisTemplate 对象
/**
* @ClassName RedisConfig
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:55
* @Version 1.0
*/
@Configuration
public class RedisConfig {
@Bean
public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory connectionFactory){
// 新建 RedisTemplate 对象,key 为 String 对象,value 为 Serializable(可序列化的)对象
RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
// key 值使用字符串序列化器
redisTemplate.setKeySerializer(new StringRedisSerializer());
// value 值使用 json 序列化器
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
// 传入连接工厂
redisTemplate.setConnectionFactory(connectionFactory);
// 返回 redisTemplate 对象
return redisTemplate;
}
}
新建 GoodController 业务类,用于贩卖商品
/**
* @ClassName GoodController
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:59
* @Version 1.0
*/
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@GetMapping("/buy_goods")
public String buy_Goods() {
// 从 redis 中获取商品的剩余数量
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
String retStr = null;
// 商品数量大于零才能出售
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
retStr = "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
retStr = "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
System.out.println(retStr);
return retStr;
}
}
boot_redis02 工程的端口号为 2222,其他配置均与 boot_redis01 相同
1.3 启动项目测试时可能遇到的问题
抛异常:Unable to connect to Redis
启动 SpringBoot 应用,二话不说直接抛了个异常:
解决问题:关闭 CentOS 防火墙
执行 systemctl stop firewalld.service 命令关闭 CentOS 防火墙,然后使用 systemctl status firewalld.service 查看防火墙状态,出现 dead 字样就表示防火墙已经关闭
抛出异常:DENIED Redis is running in protected mode because protected mode is enabled
解决问题:将 protected-mode 字段设置为 no
打开 redis.conf 配置文件,将 protected-mode 字段设置为 no
接着将 bind 127.0.0.1 的配置注释掉.。
抛异常:MISCONF Redis is configured to save RDB snapshots, but it is currently not able to persist on disk
嗨呀,好熟悉的异常,这不是之前见到过的嘛~究其原因是因为强制把 redis 快照关闭了导致不能持久化的问题,通过将 stop-writes-on-bgsave-error 字段的值设置为 no 即可避免这种问题。
**解决问题:将 stop-writes-on-bgsave-error 字段的值设置为 no **
打开 redis.conf 配置文件,将 stop-writes-on-bgsave-error 字段的值设置为 no,然后重启 redis 服务
1.4 搭建nginx
1、安装 gcc
安装 nginx 需要先将官网下载的源码进行编译,编译依赖 gcc 环境,如果没有 gcc 环境,则需要安装:
yum install gcc-c++
2、PCRE pcre-devel 安装
PCRE(Perl Compatible Regular Expressions)是一个 Perl 库,包括 perl 兼容的正则表达式库。nginx 的 http 模块使用 pcre 来解析正则表达式,所以需要在 linux 上安装 pcre 库,pcre-devel 是使用 pcre 开发的一个二次开发库。nginx也需要此库。安装命令为:
yum install -y pcre pcre-devel
3、zlib 安装
zlib 库提供了很多种压缩和解压缩的方式, nginx 使用 zlib 对 http 包的内容进行 gzip ,所以需要在 CentOS 上安装 zlib 库。
yum install -y zlib zlib-devel
4、OpenSSL 安装
OpenSSL 是一个强大的安全套接字层密码库,囊括主要的密码算法、常用的密钥和证书封装管理功能及 SSL 协议,并提供丰富的应用程序供测试或其它目的使用。nginx 不仅支持 http 协议,还支持 https(即在ssl协议上传输http),所以需要在 Centos 安装 OpenSSL 库。
yum install -y openssl openssl-devel
5、下载 nginx 安装包
使用 wget 命令下载 nginx 安装包,确保系统已经安装了wget,如果没有安装,执行 yum install wget 安装
wget -c https://nginx.org/download/nginx-1.12.0.tar.gz
6、解压安装包
使用 tar -zxvf 指令解压下载好的安装包,并进入解压后的目录
tar -zxvf nginx-1.12.0.tar.gz
7、配置 nginx
其实在 nginx-1.12.0 版本中你就不需要去配置相关东西,默认就可以了。当然,如果你要自己配置目录也是可以的。我这里采用默认配置,在 nginx 安装包目录下执行如下指令:
cd nginx-1.12.0
./configure
执行完 ./configure 命令之后会生成 Makefile 文件,我们编译安装程序就需要它
8、编译 & 安装
make 和 make install 打一套
make
make install
9、查看 nginx 安装目录
使用 whereis nginx 命令查看 nginx 的默认安装目录
首先进入 nginx 安装目录下的可执行文件存放的目录
cd /usr/local/nginx/sbin/
启动 nginx
./nginx # 启动 nginx 服务器
./nginx -s stop # 此方式相当于先查出nginx进程id再使用kill命令强制杀掉进程。
./nginx -s quit # 此方式停止步骤是待nginx进程处理任务完毕进行停止。
./nginx -s reload # 重启nginx服务
访问 http://localhost/(nginx 默认是 80 端口),如出现如下页面则说明启动成功
编写配置文件
首先进入 /usr/local/nginx/conf/,该文件中包含 nginx 的配置文件:nginx.conf
cd /usr/local/nginx/conf/
使用 vim nginx.conf 命令编辑配置文件:
#gzip on;
upstream mynginx{
server 192.168.1.6:1111 weight=1;
server 192.168.1.6:2222 weight=1;
}
server {
listen 80;
server_name localhost;
#charset koi8-r;
#access_log logs/host.access.log main;
location / {
#root html;
proxy_pass http://mynginx;
index index.html index.htm;
}
#error_page 404 /404.html;
# redirect server error pages to the static page /50x.html
#
error_page 500 502 503 504 /50x.html;
location = /50x.html {
root html;
}
# ...
重启 nginx 服务
执行 ./nginx -s reload 命令重启 nginx 服务
访问 nginx
访问 http://192.168.152.233/buy_goods 网址能够得到如下代码,我们在 nginx 配置的权重相等,默认是轮询访问
1.5 测试成功
在 Windows 浏览器中访问 http://192.168.1.6:1111/buy_goods 和 http://192.168.1.6:2222/buy_goods 能得到如下结果
通过nginx访问
二、分布式锁解决并发安全问题
2.1 单机版加锁
上面已经成功搭建了一个分布式工程项目。
问题:但讲分布式锁前,我们先考虑一个问题,假设没有搭建分布式架构,单机版的工程项目存在什么问题?
答案:显然就上面代码而言,即便只部署了一个单机版工程,在高并发的情况下,肯定会出现超卖现象。
解决:加锁,那么问题又来了,加 synchronized 锁还是 ReentrantLock 锁呢?
- synchronized:不见不散,等不到锁就会死等
- ReentrantLock:过时不候,lock.tryLock() 提供一个过时时间的参数,时间一到自动放弃锁
如何选择:根据业务需求来选,如果非要抢到锁不可,就使用 synchronized 锁;如果可以暂时放弃锁,等会再来强,就使用 ReentrantLock 锁
/**
* @ClassName GoodController
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:59
* @Version 2.0
*/
@RestController
public class GoodController {
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@GetMapping("/buy_goods")
public String buy_Goods() {
synchronized (this) {
// 从 redis 中获取商品的剩余数量
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
String retStr = null;
// 商品数量大于零才能出售
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
retStr = "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
retStr = "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
System.out.println(retStr);
return retStr;
}
}
}
- 在单机环境下,可以使用 synchronized 锁或 Lock 锁来实现。
- 但是在分布式系统中,因为竞争的线程可能不在同一个节点上(同一个 jvm 中),所以需要一个让所有进程都能访问到的锁来实现,比如 redis
或者 zookeeper 来构建; - 不同进程 jvm 层面的锁就不管用了,那么可以利用第三方的一个组件,来获取锁,未获取到锁,则阻塞当前想要运行的线程
1、jmeter 下载地址
jmeter 官网:这下载速度到猴年马月才能下载完成
清华大学镜像:推荐下载地址~~~
2、配置 jmeter
点击【bin】目录下的【jmeter.bat】批处理脚本启动 jmeter
在【Test Plan】上右击,选择【Add】–>【Threads】–>【Thread Group】,添加线程组
设置如下四个参数
- Name:线程组的名称
- Number of Threads(users):打出去的线程数量
- Ramp-up period(seconds):在多长时间内需要将这些线程打出去
- Loop Count:循环次数,选择 Infinite 表示无限重复执行
在线程组之上右击,选择【Add】–>【Sampler】–>【HTTP Request】,添加 HTTP 请求
设置如下三个参数
- Server Name or IP:服务器名称或者 IP 地址
- Port Number:访问的端口号
- Path:访问的路径
保存此 HTTP Request 后才能进行压测
点击【Start】按钮开始进行压测
3、压测结果
可以看到,相同的商品被出售两次,出现超卖现象
2.2 分布式锁版本一
在我们引入了nginx后,并将项目实现分布式部署后,上述单机版的解决方案就不管用了,这就便引出了分布式锁。
问题:分布式部署之后,单机版的锁失效,单机版的锁还是会导致超卖现象,这时就需要分布式锁
1、SET 命令
Redis具有极高的性能,且其命令对分布式锁支持友好,单个命令执行具备原子性,执行命令是单线程顺序执行具备线程安全,借助 SET 命令即可实现加锁处理
The SET command supports a set of options that modify its behavior:
- EX seconds – Set the specified expire time, in seconds.
- PX milliseconds – Set the specified expire time, in milliseconds.
- EXAT timestamp-seconds – Set the specified Unix time at which the
key will expire, in seconds. - PXAT timestamp-milliseconds – Set the specified Unix time at which
the key will expire, in milliseconds. - NX – Only set the key if it does not already exist.
- XX – Only set the key if it already exist.
- KEEPTTL – Retain the time to live associated with the key.
- GET – Return the old value stored at key, or nil when key did not
exist.
2、在代码中使用分布式锁
使用当前请求的 UUID + 线程名作为分布式锁的 value,执行 stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value) 方法尝试抢占锁,如果抢占失败,则返回值为 false;如果抢占成功,则返回值为 true。最后记得调用 stringRedisTemplate.delete(REDIS_LOCK_KEY) 方法释放分布式锁
/**
* @ClassName GoodController
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:59
* @Version 2.0
*/
@RestController
public class GoodController {
private static final String REDIS_LOCK_KEY = "lockOneby";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@GetMapping("/buy_goods")
public String buy_Goods() {
// 当前请求的 UUID + 线程名
String value = UUID.randomUUID().toString()+Thread.currentThread().getName();
// setIfAbsent() 就相当于 setnx,如果不存在就新建锁
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value);
// 抢锁失败
if(lockFlag == false){
return "抢锁失败 o(╥﹏╥)o";
}
// 从 redis 中获取商品的剩余数量
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
String retStr = null;
// 商品数量大于零才能出售
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
retStr = "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
retStr = "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
System.out.println(retStr);
stringRedisTemplate.delete(REDIS_LOCK_KEY); // 释放分布式锁
return retStr;
}
}
3、代码测试
加上分布式锁之后,解决了超卖现象
2.3 分布式锁版本二
如果代码在执行的过程中出现异常,那么就可能无法释放锁,因此必须要在代码层面加上 finally 代码块,保证锁的释放。
/**
* @ClassName GoodController
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:59
* @Version 1.0
*/
@RestController
public class GoodController {
private static final String REDIS_LOCK_KEY = "lockOneby";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@GetMapping("/buy_goods")
public String buy_Goods() {
// 当前请求的 UUID + 线程名
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
// setIfAbsent() 就相当于 setnx,如果不存在就新建锁
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value);
// 抢锁失败
if (lockFlag == false) {
return "抢锁失败 o(╥﹏╥)o";
}
// 从 redis 中获取商品的剩余数量
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
String retStr = null;
// 商品数量大于零才能出售
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
retStr = "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
retStr = "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
System.out.println(retStr);
return retStr;
} finally {
stringRedisTemplate.delete(REDIS_LOCK_KEY); // 释放分布式锁
}
}
}
2.4 分布式锁版本三
假设部署了微服务 jar 包的服务器挂了,代码层面根本没有走到 finally 这块,也没办法保证解锁。这个 key 没有被删除,其他微服务就一直抢不到锁,因此我们需要加入一个过期时间限定的 key
执行 stringRedisTemplate.expire(REDIS_LOCK_KEY, 10L, TimeUnit.SECONDS); 方法为分布式锁设置过期时间,保证锁的释放
/**
* @ClassName GoodController
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:59
* @Version 1.0
*/
@RestController
public class GoodController {
private static final String REDIS_LOCK_KEY = "lockOneby";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@GetMapping("/buy_goods")
public String buy_Goods() {
// 当前请求的 UUID + 线程名
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
// setIfAbsent() 就相当于 setnx,如果不存在就新建锁
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value);
// 设置过期时间为 10s
stringRedisTemplate.expire(REDIS_LOCK_KEY, 10L, TimeUnit.SECONDS);
// 抢锁失败
if (lockFlag == false) {
return "抢锁失败 o(╥﹏╥)o";
}
// 从 redis 中获取商品的剩余数量
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
String retStr = null;
// 商品数量大于零才能出售
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
retStr = "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
retStr = "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
System.out.println(retStr);
return retStr;
} finally {
stringRedisTemplate.delete(REDIS_LOCK_KEY); // 释放分布式锁
}
}
}
2.5 分布式锁版本四
加锁与设置过期时间的操作分开了,失去了原子性,假设服务器刚刚执行了加锁操作,然后宕机了,未得执行设置过期时间,也没就办法保证解锁。
使用 stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value, 10L, TimeUnit.SECONDS) 方法,在加锁的同时设置过期时间,保证这两个操作的原子性
/**
* @ClassName GoodController
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:59
* @Version 1.0
*/
@RestController
public class GoodController {
private static final String REDIS_LOCK_KEY = "lockOneby";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@GetMapping("/buy_goods")
public String buy_Goods() {
// 当前请求的 UUID + 线程名
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
// setIfAbsent() 就相当于 setnx,如果不存在就新建锁,同时加上过期时间保证原子性
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value, 10L, TimeUnit.SECONDS);
// 抢锁失败
if (lockFlag == false) {
return "抢锁失败 o(╥﹏╥)o";
}
// 从 redis 中获取商品的剩余数量
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
String retStr = null;
// 商品数量大于零才能出售
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
retStr = "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
retStr = "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
System.out.println(retStr);
return retStr;
} finally {
stringRedisTemplate.delete(REDIS_LOCK_KEY); // 释放分布式锁
}
}
}
2.6 分布式锁版本五
张冠李戴,删除了别人的锁:我们无法保证一个线程的执行时间,有可能是 10s,有可能是 20s,也有可能更长。因为执行业务的时候可能会调用其他服务,我们并不能保证其他服务的调用时间。如果设置的锁过期了,当前线程还正在执行,那么就有可能出现超卖问题,并且还有可能出现当前线程执行完成后,释放了其他线程的锁
如下图,假设进程 A 在 T2 时刻设置了一把过期时间为 30s 的锁,在 T5 时刻该锁过期被释放,在 T5 和 T6 期间,Test 这把锁已经失效了,并不能保证进程 A 业务的原子性了。于是进程 B 在 T6 时刻能够获取 Test 这把锁,但是进程 A 在 T7 时刻删除了进程 B 加的锁,进程 B 在 T8 时刻删除锁的时候就蒙蔽了,我 TM 锁呢?
注:这里先考虑只允许删除自己的锁,不允许删除别人的锁的问题;先不考虑作业时间大于预设锁的过期时间,后面会解决。
在释放锁之前,执行 value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(REDIS_LOCK_KEY)) 方法判断是否为自己加的锁
/**
* @ClassName GoodController
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:59
* @Version 1.0
*/
@RestController
public class GoodController {
private static final String REDIS_LOCK_KEY = "lockOneby";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@GetMapping("/buy_goods")
public String buy_Goods() {
// 当前请求的 UUID + 线程名
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
// setIfAbsent() 就相当于 setnx,如果不存在就新建锁,同时加上过期时间保证原子性
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value, 10L, TimeUnit.SECONDS);
// 抢锁失败
if (lockFlag == false) {
return "抢锁失败 o(╥﹏╥)o";
}
// 从 redis 中获取商品的剩余数量
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
String retStr = null;
// 商品数量大于零才能出售
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
retStr = "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
retStr = "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
System.out.println(retStr);
return retStr;
} finally {
// 判断是否是自己加的锁
if(value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(REDIS_LOCK_KEY))){
stringRedisTemplate.delete(REDIS_LOCK_KEY); // 释放分布式锁
}
}
}
}
2.7 分布式锁版本六
在 finally 代码块中的判断与删除并不是原子操作,假设执行 if 判断的时候,这把锁还是属于当前线程,但是有可能刚执行完 if 判断,这把锁就被其他业务给释放了,还是会出现误删锁的情况
try {
// ...
}
finally {
// 判断加锁与解锁是不是同一个客户端
if (value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(REDIS_LOCK_KEY))){
// 若在此时,这把锁突然不是这个客户端的,则会误解锁
stringRedisTemplate.delete(REDIS_LOCK_KEY);//释放锁
}
}
使用 redis 自身事务,通过watch和unwatch实现的乐观锁,从而实现解锁安全性。
1、事务介绍
- Redis的事务是通过MULTl,EXEC,DISCARD和WATCH这四个命令来完成。
- Redis的单个命令都是原子性的,所以这里确保事务性的对象是命令集合。
- Redis将命令集合序列化并确保处于一事务的命令集合连续且不被打断的执行。
- Redis不支持回滚的操作。
2、相关命令
-
MULTI
- 用于标记事务块的开始。
- Redis会将后续的命令逐个放入队列中,然后使用EXEC命令原子化地执行这个命令序列。
- 语法:MULTI
-
EXEC
-
在一个事务中执行所有先前放入队列的命令,然后恢复正常的连接状态。
-
语法:EXEC
-
-
DISCARD
-
清除所有先前在一个事务中放入队列的命令,然后恢复正常的连接状态。
-
语法:DISCARD
-
-
WATCH
-
当某个事务需要按条件执行时,就要使用这个命令将给定的键设置为受监控的状态。
-
语法:WATCH key[key……]注:该命令可以实现redis的乐观锁
-
-
UNWATCH
-
清除所有先前为一个事务监控的键。
-
语法:UNWATCH
-
代码
开启事务不断监视 REDIS_LOCK_KEY 这把锁有没有被别人动过,如果已经被别人动过了,那么继续重新执行删除操作,否则就解除监视
/**
* @ClassName GoodController
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:59
* @Version 1.0
*/
@RestController
public class GoodController {
private static final String REDIS_LOCK_KEY = "lockOneby";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@GetMapping("/buy_goods")
public String buy_Goods() {
// 当前请求的 UUID + 线程名
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
// setIfAbsent() 就相当于 setnx,如果不存在就新建锁,同时加上过期时间保证原子性
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value, 10L, TimeUnit.SECONDS);
// 抢锁失败
if (lockFlag == false) {
return "抢锁失败 o(╥﹏╥)o";
}
// 从 redis 中获取商品的剩余数量
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
String retStr = null;
// 商品数量大于零才能出售
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
retStr = "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
retStr = "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
System.out.println(retStr);
return retStr;
} finally {
while (true) {
//加事务,乐观锁
stringRedisTemplate.watch(REDIS_LOCK_KEY);
// 判断是否是自己加的锁
if (value.equalsIgnoreCase(stringRedisTemplate.opsForValue().get(REDIS_LOCK_KEY))) {
// 开启事务
stringRedisTemplate.setEnableTransactionSupport(true);
stringRedisTemplate.multi();
stringRedisTemplate.delete(REDIS_LOCK_KEY);
// 判断事务是否执行成功,如果等于 null,就是没有删掉,删除失败,再回去 while 循环那再重新执行删除
List<Object> list = stringRedisTemplate.exec();
if (list == null) {
continue;
}
}
//如果删除成功,释放监控器,并且 break 跳出当前循环
stringRedisTemplate.unwatch();
break;
}
}
}
}
注:此版本只剩下一个问题,就是线程作业时间大于预设锁的过期时间。留到后面说
2.8 分布式锁版本七
lua脚本保证原子性操作,redis 可以通过 eval 命令保证代码执行的原子性
1、RedisUtils 工具类
getJedis() 方法用于从 jedisPool 中获取一个连接块对象
/**
* @ClassName RedisUtils
* @Description TODO
* @Author Oneby
* @Date 2021/2/4 17:41
* @Version 1.0
*/
public class RedisUtils {
private static JedisPool jedisPool;
private static String hostAddr = "192.168.152.233";
static {
JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
jedisPoolConfig.setMaxTotal(20);
jedisPoolConfig.setMaxIdle(10);
jedisPool = new JedisPool(jedisPoolConfig, hostAddr, 6379, 100000);
}
public static Jedis getJedis() throws Exception {
if (null != jedisPool) {
return jedisPool.getResource();
}
throw new Exception("Jedispool is not ok");
}
}
2、使用 lua 脚本保证解锁操作的原子性
/**
* @ClassName GoodController
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:59
* @Version 1.0
*/
@RestController
public class GoodController {
private static final String REDIS_LOCK_KEY = "lockOneby";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@GetMapping("/buy_goods")
public String buy_Goods() throws Exception {
// 当前请求的 UUID + 线程名
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
try {
// setIfAbsent() 就相当于 setnx,如果不存在就新建锁,同时加上过期时间保证原子性
Boolean lockFlag = stringRedisTemplate.opsForValue().setIfAbsent(REDIS_LOCK_KEY, value, 10L, TimeUnit.SECONDS);
// 抢锁失败
if (lockFlag == false) {
return "抢锁失败 o(╥﹏╥)o";
}
// 从 redis 中获取商品的剩余数量
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
String retStr = null;
// 商品数量大于零才能出售
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
retStr = "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
retStr = "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
System.out.println(retStr);
return retStr;
} finally {
// 获取连接对象
Jedis jedis = RedisUtils.getJedis();
// lua 脚本,摘自官网
String script = "if redis.call('get', KEYS[1]) == ARGV[1]" + "then "
+ "return redis.call('del', KEYS[1])" + "else " + " return 0 " + "end";
try {
// 执行 lua 脚本
Object result = jedis.eval(script, Collections.singletonList(REDIS_LOCK_KEY), Collections.singletonList(value));
// 获取 lua 脚本的执行结果
if ("1".equals(result.toString())) {
System.out.println("------del REDIS_LOCK_KEY success");
} else {
System.out.println("------del REDIS_LOCK_KEY error");
}
} finally {
// 关闭链接
if (null != jedis) {
jedis.close();
}
}
}
}
}
3、代码测试
使用 lua 脚本可以防止别人动我们自己的锁~~~
注:此版本仍旧剩下一个问题,就是线程作业时间大于预设锁的过期时间。留到后面说
2.9 分布式锁最终版
前面已经讲过了:我们无法保证一个业务的执行时间,有可能是 10s,有可能是 20s,也有可能更长。因为执行业务的时候可能会调用其他服务,我们并不能保证其他服务的调用时间。如果设置的锁过期了,当前业务还正在执行,那么之前设置的锁就失效了,就有可能出现超卖问题。
因此我们需要确保 redisLock 过期时间大于业务执行时间的问题,即面临如何对 Redis 分布式锁进行续期的问题
redis:保证AP,可用性和分区容错性
- redis 异步复制造成的锁丢失, 比如:主节点没来的及把刚刚 set
进来这条数据给从节点,就挂了,那么主节点和从节点的数据就不一致。此时如果集群模式下,就得上 Redisson 来解决
zookeeper:保证CP,一致性和分区容错性
- zookeeper 保持强一致性原则,对于集群中所有节点来说,要么同时更新成功,要么失败,因此使用 zookeeper
集群并不存在主从节点数据丢失的问题,但丢失了速度方面的性能
前面的众多版本,都能实现了分布锁,也能满足一些简单项目架构的需求。但是对于一些大型项目,其架构比较复杂,上述的那些版本就无能为力了, 比如redis集群环境下,我们自己写的也不能保证完全没有问题,因此我们用官方推出的Redisson 实现。
redis 分布式锁
redisson GitHub 地址
1、注入 Redisson 对象
在 RedisConfig 配置类中注入 Redisson 对象
/**
* @ClassName RedisConfig
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:55
* @Version 1.0
*/
@Configuration
public class RedisConfig {
@Value("${spring.redis.host}")
private String redisHost;
@Bean
public RedisTemplate<String, Serializable> redisTemplate(LettuceConnectionFactory connectionFactory) {
// 新建 RedisTemplate 对象,key 为 String 对象,value 为 Serializable(可序列化的)对象
RedisTemplate<String, Serializable> redisTemplate = new RedisTemplate<>();
// key 值使用字符串序列化器
redisTemplate.setKeySerializer(new StringRedisSerializer());
// value 值使用 json 序列化器
redisTemplate.setValueSerializer(new GenericJackson2JsonRedisSerializer());
// 传入连接工厂
redisTemplate.setConnectionFactory(connectionFactory);
// 返回 redisTemplate 对象
return redisTemplate;
}
@Bean
public Redisson redisson() {
Config config = new Config();
config.useSingleServer().setAddress("redis://" + redisHost + ":6379").setDatabase(0);
return (Redisson) Redisson.create(config);
}
}
2、业务逻辑
直接 redissonLock.lock()、redissonLock.unlock() 完事儿,这尼玛就是 juc 版本的 redis 分布式锁啊
/**
* @ClassName GoodController
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:59
* @Version 1.0
*/
@RestController
public class GoodController {
private static final String REDIS_LOCK_KEY = "lockOneby";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@Autowired
private Redisson redisson;
@GetMapping("/buy_goods")
public String buy_Goods() throws Exception {
// 当前请求的 UUID + 线程名
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
// 获取锁
RLock redissonLock = redisson.getLock(REDIS_LOCK_KEY);
// 上锁
redissonLock.lock();
try {
// 从 redis 中获取商品的剩余数量
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
String retStr = null;
// 商品数量大于零才能出售
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
retStr = "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
retStr = "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
System.out.println(retStr);
return retStr;
} finally {
// 解锁
redissonLock.unlock();
}
}
}
代码测试
在超高并发的情况下,还可能会抛出如下异常,原因是解锁 lock 的线程并不是当前线程
在释放锁之前加一个判断:还在持有锁的状态,并且是当前线程持有的锁再解锁
/**
* @ClassName GoodController
* @Description TODO
* @Author Oneby
* @Date 2021/2/2 18:59
* @Version 1.0
*/
@RestController
public class GoodController {
private static final String REDIS_LOCK_KEY = "lockOneby";
@Autowired
private StringRedisTemplate stringRedisTemplate;
@Value("${server.port}")
private String serverPort;
@Autowired
private Redisson redisson;
@GetMapping("/buy_goods")
public String buy_Goods() throws Exception {
// 当前请求的 UUID + 线程名
String value = UUID.randomUUID().toString() + Thread.currentThread().getName();
// 获取锁
RLock redissonLock = redisson.getLock(REDIS_LOCK_KEY);
// 上锁
redissonLock.lock();
try {
// 从 redis 中获取商品的剩余数量
String result = stringRedisTemplate.opsForValue().get("goods:001");
int goodsNumber = result == null ? 0 : Integer.parseInt(result);
String retStr = null;
// 商品数量大于零才能出售
if (goodsNumber > 0) {
int realNumber = goodsNumber - 1;
stringRedisTemplate.opsForValue().set("goods:001", realNumber + "");
retStr = "你已经成功秒杀商品,此时还剩余:" + realNumber + "件" + "\t 服务器端口: " + serverPort;
} else {
retStr = "商品已经售罄/活动结束/调用超时,欢迎下次光临" + "\t 服务器端口: " + serverPort;
}
System.out.println(retStr);
return retStr;
} finally {
// 还在持有锁的状态,并且是当前线程持有的锁再解锁
if (redissonLock.isLocked() && redissonLock.isHeldByCurrentThread()){
redissonLock.unlock();
}
}
}
}
三、分布式锁总结
- synchronized 锁:单机版 OK,上 nginx分布式微服务,单机锁就不 OK,
- 分布式锁:取消单机锁,上 redis 分布式锁 SETNX
- 如果出异常的话,可能无法释放锁, 必须要在 finally 代码块中释放锁
- 如果宕机了,部署了微服务代码层面根本没有走到 finally 这块,也没办法保证解锁,因此需要有设置锁的过期时间
- 除了增加过期时间之外,还必须要 SETNX 操作和设置过期时间的操作必须为原子性操作
- 规定只能自己删除自己的锁,你不能把别人的锁删除了,防止张冠李戴,可使用 lua 脚本或者事务
- 判断锁所属业务与删除锁的操作也需要是原子性操作
- redis 集群环境下,我们自己写的也不 OK,直接上 RedLock 之 Redisson 落地实现