一、Redis锁
-
多个客户端,通过watch一个键-值,然后开启事务
-
如果在开启事务的期间,watch的值没有被其他客户端修改过,则执行成功
-
如果在开启事务的期间,watch的值被其他客户端修改了,则执行失败
set name erick
set age 10watch name age # 监控对应的属性
multi
set address shanxi
set year 2022
exec如果事务在执行期间,没有其他客户端去操作被watch的值,则A客户端的事物可以执行成功
如果想解除监控,unwatch
watch必须在开启事务之前
二、Redis分布式锁
-
满足分布式系统或集群模式下,多进程可见并互斥的锁
多线程可见
互斥
高可用
高并发
安全
1. Redis锁
-
基础版本
获取锁:
1. 获取锁,单线程保证一定只有一个线程来执行
SETNX key value
释放锁
redis.del(LOCK_NAME)2. 如果加锁后,还没来的及释放锁, redis挂了,就可能永远不能释放锁
因此,对于锁的k-v,要加上过期时间
3. 如果在加锁和释放锁期间redis挂了,依然没办法,因此redis提供了一个原子性的指令
SET lock value EX 10 NX
4. 非阻塞式的,只会试一次,成功了就返回ok,失败了就返回nil
package com.erick;
import redis.clients.jedis.Jedis;
public class Test {
private static String LOCK_NAME = "LOCK"; /*可以将value替换为Thread的名字*/ private static String LOCK_VALUE = "ERICK"; private static int EXPIRE_SECS = 5; public static void main(String[] args) { thirdLock(); } /*假如执行完业务后,redis还没来的及释放锁,redis就挂了*/ private static void firstLock() { Jedis redis = getRedis(); /*获取锁,单线程保证一定只有一个线程来执行*/ Long result = redis.setnx(LOCK_NAME, LOCK_VALUE); if (result == 1) { executeBusiness(); /*释放锁*/ redis.del(LOCK_NAME); } else { /*等待*/ } } /*假如在上锁和设置过期时间内,redis挂了,锁依然无法释放 * 因此在设置时候,必须要将加锁和设置过期时间,做成原子性的*/ private static void secondLock() { Jedis redis = getRedis(); Long result = redis.setnx(LOCK_NAME, LOCK_VALUE); /*加锁的时候,加上过期时间*/ redis.setex(LOCK_NAME, EXPIRE_SECS, ""); if (result == 1) { executeBusiness(); /*释放锁*/ redis.del(LOCK_NAME); } else { /*等待*/ } } private static void thirdLock() { Jedis redis = getRedis(); /*是一条原子指令*/ String result = redis.set(LOCK_NAME, LOCK_VALUE, "NX", "EX", EXPIRE_SECS); if ("OK".equalsIgnoreCase(result)) { executeBusiness(); /*释放锁*/ //redis.del(LOCK_NAME); } else { /*等待*/ } } private static void executeBusiness() { System.out.println("执行业务"); } private static Jedis getRedis() { Jedis jedis = new Jedis("60.205.229.31", 6380); return jedis; }
}
2. 增强版本
-
上面的分布式锁存在的问题
-
最简单的方法:将超时时间设置的长一些,远大于业务执行的时间,但是会带来性能问题
解决方法:
锁的key-value:
- 其中key可以用业务名称来表示
- value用uuid来表示
2.1 删除锁的时候,先通过value来判断锁是不是自己线程的
2.2 如果是,则删除,如果不是,就不要删除
package com.erick.redis;
import redis.clients.jedis.Jedis;
import java.util.UUID;
import java.util.concurrent.TimeUnit;public class Test02 {
public static void main(String[] args) {
new Thread(() -> businessWithLock()).start();
new Thread(() -> businessWithLock()).start();
}private static void businessWithLock() { Jedis redis = getRedis(); String lockName = "LOCK_BUSINESS"; /*锁的值,用threadId和uuid来表示,避免了多个线程*/ String lockValue = Thread.currentThread().getId() + UUID.randomUUID().toString(); String result = redis.set(lockName, lockValue, "NX", "EX", 5); if ("OK".equalsIgnoreCase(result)) { executeBusiness(); } else { return; } /*删除的逻辑 * 1. 将当前的key的值拿过来与自己之前存的来比较 * 2. 如果相同,则删除,如果不同,则什么都不要做*/ String presentLockValue = redis.get(lockName); if (lockValue.equals(presentLockValue)) { System.out.println("删除锁"); redis.del(lockName); } else { System.out.println("业务执行完了,但是当前锁已经不是自己的,所以不用删除"); } } /*耗时业务*/ private static void executeBusiness() { try { System.out.println("doing business"); TimeUnit.SECONDS.sleep(20); } catch (InterruptedException e) { e.printStackTrace(); } } private static Jedis getRedis() { Jedis jedis = new Jedis("60.205.229.31", 6380); return jedis; }
}
3. Lua脚本
3.1 问题
- 判断锁是否能释放,和锁真正释放的代码中间,假如存在full gc,那么就会依然出现问题
- 又一次出现了并发修改的问题
- 因此,判断锁是否该释放锁和释放锁,应该做成一个原子性的动作
-
Redis提供了Lua脚本功能,在一个脚本中编写多条Redis命令,确保多条命令的原子性
set: 表示redis的命令
KEYS[1]: redis的key值个数
ARGV[1]: redis的value的值个数
1: 具体包含几个值
age: 实际传递的key值
20: 实际传递的value值EVAL “return redis.call(‘set’,KEYS[1],ARGV[1])” 1 age 20
3.2 lua脚本解决
-- 获取锁中的线程标示,动态传递参数
local keyName = redis.call('get',KEYS[1])
-- 比较线程标示与锁中的是否一直
if (ARGV[1] == keyName) then
-- 释放锁
redis.call('del',KEYS[1])
return 1
-- 如果不一致,则返回结果为0
else
return 0
end
private static boolean deleteLockIfMy(Jedis redis, String lockKey, String lockValue) {
/*用lua脚本来保证*/
String luaScript = "-- 获取锁中的线程标示,动态传递参数
" +
"local keyName = redis.call('get',KEYS[1])
" +
"
" +
"-- 比较线程标示与锁中的是否一直
" +
"if (keyName == ARGV[1]) then
" +
" -- 释放锁
" +
" redis.call('del',KEYS[1])
" +
" return 1
" +
" -- 如果不一致,则返回结果为0
" +
"else
" +
" return 0
" +
"end";
/*加载脚本*/
String script = redis.scriptLoad(luaScript);
/*向脚本中传递参数*/
Object delResult = redis.evalsha(script, Arrays.asList(lockKey), Arrays.asList(lockValue));
/*上面的结果是Long类型*/
if (delResult.equals(1L)) {
return true;
} else {
return false;
}
}
4. 存在的问题
三、Redisson
-
一个用来进行分布式锁的工具类
org.redisson redisson 3.16.8
1. 基本Demo
package com.erick.redis;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;
import java.util.concurrent.TimeUnit;
public class Test03 {
public static void main(String[] args) {
new Thread(() -> lockWithBusiness()).start();
new Thread(() -> lockWithBusiness()).start();
}
private static void lockWithBusiness() {
RedissonClient redissonClient = redissonClient();
/*获取对应的key的锁*/
String lockKey = "COMMERCE-BUSINESS";
RLock lock = redissonClient.getLock(lockKey);
try {
// 内部包含重试时间
/*参数一: 最长等待时间
* 参数二:锁超时释放时间
* 参数三:时间单位*/
boolean hasLok = lock.tryLock();
if (hasLok) {
business();
} else {
System.out.println("暂时没有获取到锁");
}
} finally {
/*第二个锁释放的时候,会报 删除异常
* IllegalMonitorStateException*/
lock.unlock();
}
}
/*等待超时的锁*/
private static void lockWithBusinessWithTime() {
RedissonClient redissonClient = redissonClient();
/*获取对应的key的锁*/
String lockKey = "COMMERCE-BUSINESS";
RLock lock = redissonClient.getLock(lockKey);
try {
// 内部包含重试时间
/*参数一: 最长等待时间
* 参数二:锁超时释放时间
* 参数三:时间单位*/
boolean hasLok = lock.tryLock(6, 20, TimeUnit.SECONDS);
if (hasLok) {
business();
} else {
System.out.println("暂时没有获取到锁");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
// 最长等待时间获取不到,删除的时候依然会报错
lock.unlock();
}
}
private static void business() {
System.out.println("执行业务");
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/*Redis的配置类*/
private static RedissonClient redissonClient() {
Config config = new Config();
config.useSingleServer().setAddress("redis://60.205.229.31:6380");
return Redisson.create(config);
}
}
2. 可重入性
2.1 不可重入锁
2.2 可重入锁
- 存储的键值对用Hash结构来保存
- 为了保证多条命令的原子性,必须采取lua脚本来做
2.3 lua脚本
3. 重试机制
- 通过等待时间结合,发布以及订阅模式来实现
4. 主从一致性
4.1 问题
4.2 联锁
-
解决方案:设立多个redis作为主节点
-
只有每个都获取成功的时候,才会去执行
package com.erick.redis;
import org.redisson.Redisson;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.redisson.config.Config;import java.util.concurrent.TimeUnit;
public class Test04 {
public static void main(String[] args) {
businessWithLock();
}private static void businessWithLock() { String lockKey = "BUSINESS"; RedissonClient firstClient = redissonClient01(); RedissonClient secondClient = redissonClient02(); RedissonClient thirdClient = redissonClient03(); RLock firstLock = firstClient.getLock(lockKey); RLock secondLock = secondClient.getLock(lockKey); RLock thirdLock = thirdClient.getLock(lockKey); /*获取到多把锁*/ RLock multiLock = firstClient.getMultiLock(firstLock, secondLock, thirdLock); boolean hasLock = multiLock.tryLock(); try{ if (hasLock) { business(); } else { System.out.println("未获取到锁,业务没有执行"); } }finally { multiLock.unlock(); } } private static void business() { System.out.println("执行业务"); try { TimeUnit.SECONDS.sleep(5); } catch (InterruptedException e) { e.printStackTrace(); } } /*Redis的配置类*/ private static RedissonClient redissonClient01() { Config config = new Config(); config.useSingleServer().setAddress("redis://60.205.229.31:6379"); return Redisson.create(config); } private static RedissonClient redissonClient02() { Config config = new Config(); config.useSingleServer().setAddress("redis://60.205.229.31:6380"); return Redisson.create(config); } private static RedissonClient redissonClient03() { Config config = new Config(); config.useSingleServer().setAddress("redis://60.205.229.31:6381"); return Redisson.create(config); }
}