https://blog.csdn.net/weixin_47409774/article/details/123505651
缓存
SpringBoot整合Redis
1.导入依赖
<!-- 引入redis --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> </dependency> 12345
2.配置Redis
redis: host: 127.0.0.1 password: mima port: 6379
3.在业务中使用springboot自动配置好的RedisTemplate操作Redis
@Test public void testStringRedisTemplate(){ //往Redis中存入key ValueOperations<String, String> ops = stringRedisTemplate.opsForValue(); //保存数据 ops.set("hello", "word_"+ UUID.randomUUID().toString()); //查询 String hello = ops.get("hello"); System.out.println("redis:"+hello); } 1234567891011
4.进行压测
线程数:50 出现对外内存溢出异常
原因
-
Springboot2.0以后默认会使用lettuce作为操作Redis的客户端
-
lettuce要使用redis跟redis建立连接使用的是netty
-
lettuce的bug导致内存溢出 -Xmx300m 如果没有指定堆外内存,默认使用-Xmx300m 作为对外内存
-
内存没有得到及时的释放,可以通过
-Dio.netty.maxDirectMemory
进行设置
解决: 不能只去使用 -Dio.netty.maxDirectMemory 调大堆外内存
-
升级lettuce 客户端
-
切换使用Jedis(使用此种方法) 排除加载lettuce
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-redis</artifactId> <exclusions> <exclusion> <groupId>io.lettuce</groupId> <artifactId>lettuce-core</artifactId> </exclusion> </exclusions> </dependency> 12345678910
引入jedis 版本由springboot控制
<jedis.version>2.9.3</jedis.version> 1 <dependency> <groupId>redis.clients</groupId> <artifactId>jedis</artifactId> </dependency> 1234
高并发系统下缓存失效带来的问题
1.缓存穿透
缓存穿透是指查询一个一定不存在的数据,由于缓存是不命中,将去查询数据库,但是数 据库也无此记录,我们没有将这次查询的null 写入缓存,这将导致这个不存在的数据每次 请求都要到存储层去查询,失去了缓存的意义。 在流量大时,可能DB 就挂掉了,要是有人利用不存在的key 频繁攻击我们的应用,这就是 漏洞。 解决:
2. 缓存雪崩
缓存雪崩是指在我们设置缓存时采用了相同的过期时间,导致缓存在某一时刻同时失 效,请求全部转发到DB,DB 瞬时压力过重雪崩。 解决:
3、缓存击穿
对于一些设置了过期时间的key,如果这些key 可能会在某些时间点被超高并发地访问, 是一种非常“热点”的数据。 这个时候,需要考虑一个问题:如果这个key 在大量请求同时进来前正好失效,那么所 有对这个key 的数据查询都落到db,我们称为缓存击穿。 解决:
单体应用加锁
加锁方式:将代码放入同步代码块 只要是同一把锁,就能锁住,需要这个锁的所有线程 1.使用this当前对象加锁,SpringBoot所有的组件在容器中都是单例的,相当于有多少请求都会用同一个this,是可以的
synchronized (this){ //得到锁以后应该再去缓存中确定一次,如果没有才需要继续查询 String catalogJSON = redisTemplate.opsForValue().get("catalogJSON"); //如果不是空的直接返回 if (!StringUtils.isEmpty(catalogJSON)){ //缓存不为空直接返回 Map<String, List<Catelog2Vo>> result = JSON.parseObject(catalogJSON, new TypeReference<Map<String, List<Catelog2Vo>>>(){}); return result; } //执行查询数据库 ...... } 12345678910111213
分布式应用加锁(核心:原子加锁,原子解锁)
测试 本地锁在分布式情况下会产生什么问题? 在用过gateway负载均衡路由到服务上时 每个服务都会有一次查询请求
1.使用redis的
SET key value [EX seconds] [PX milliseconds] [NX|XX]
是一种用 Redis 来实现锁机制的简单方法 EX seconds – 设置键key的过期时间,单位时秒 PX milliseconds – 设置键key的过期时间,单位时毫秒 NX – 只有键key不存在的时候才会设置key的值 XX – 只有键key存在的时候才会设置key的值
如果上述命令返回OK,那么客户端就可以获得锁(如果上述命令返回Nil,那么客户端可以在一段时间之后重新尝试),并且可以通过DEL命令来释放锁。 客户端加锁之后,如果没有主动释放,会在过期时间之后自动释放。
1. 如果在执行业务代码之后没有删除锁怎么办? 给锁设置超时时间,就算代码没有删除锁,redis也会自动删除锁
//1.占分布式锁 去redis占坑 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111"); if (lock) { //加锁成功....执行业务 //2.设置过期时间,到期自动删除锁 redisTemplate.expire("lock", 30, TimeUnit.SECONDS); Map<String, List<Catelog2Vo>> dataFromDB = getDataFromDB(); //删除锁 redisTemplate.delete("lock"); return dataFromDB; } else { //加锁失败...重试...一直重试 称之为自旋锁 //休眠100ms进行重试 return getCatalogJsonFromDbWithRedisLock(); } 12345678910111213141516
2.如果抢占锁成功了,但是由于各种原因没有成功设置超时时间,造成死锁 如果占锁和设置超时时间是一个原子操作,占锁的同时加上过期时间EX seconds – 设置键key的过期时间,单位时秒
//1.占分布式锁 去redis占坑 并设置过期时间,到期自动删除锁 Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", "111",300, TimeUnit.SECONDS); if (lock) { //加锁成功....执行业务 Map<String, List<Catelog2Vo>> dataFromDB = getDataFromDB(); //删除锁 redisTemplate.delete("lock"); return dataFromDB; } else { //加锁失败...重试...一直重试 称之为自旋锁 //休眠100ms进行重试 return getCatalogJsonFromDbWithRedisLock(); } 1234567891011121314
3. 在设置完过期时间后,如果执行业务代码时间过长,再去删锁 锁因为超时时间已经删除,可能就会去删一个不存在的锁。 假如说第一个线程在执行到10秒的时候自己锁已经过期了,这时候第二个线程又抢占了这个锁再去执行业务代码,而此时第一个线程的业务代码执行完毕,把第二个线程正在使用的锁给删除了 指定值为uuid 删锁的时候匹配成功才去删
//1.占分布式锁 去redis占坑 并设置过期时间,到期自动删除锁 String uuid = UUID.randomUUID().toString(); Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid,300, TimeUnit.SECONDS); if (lock) { //加锁成功....执行业务 Map<String, List<Catelog2Vo>> dataFromDB = getDataFromDB(); //先去查一下 String lockValue = redisTemplate.opsForValue().get("lock"); //如果值相同 if (uuid.equals(lockValue)){ //删除自己的锁 redisTemplate.delete("lock"); } return dataFromDB; } else { //加锁失败...重试...一直重试 称之为自旋锁 //休眠100ms进行重试 return getCatalogJsonFromDbWithRedisLock(); } 1234567891011121314151617181920
4. 如果在判断uuid是否为当前锁的时候,锁已经过期,这时候别的线程已经设置了新的值,这时候删除的是别人的锁,获取值对比+值相同删除=原子操作
使用 lua 脚本解锁
//1.占分布式锁 去redis占坑 并设置过期时间,到期自动删除锁 String uuid = UUID.randomUUID().toString(); Boolean lock = redisTemplate.opsForValue().setIfAbsent("lock", uuid, 300, TimeUnit.SECONDS); if (lock) { System.out.println("获取分布式锁成功"); Map<String, List<Catelog2Vo>> dataFromDB; try{ //加锁成功....执行业务 dataFromDB = getDataFromDB(); }finally { //获取值对比+值相同删除=原子操作 String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; //删除锁 Long lock1 = redisTemplate.execute(new DefaultRedisScript<Long>(script, Long.class), Arrays.asList("lock"), uuid); } return dataFromDB; } else { //加锁失败...重试...一直重试 称之为自旋锁 //休眠100ms进行重试 System.out.println("获取分布式锁失败....等待重试"); try { Thread.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } return getCatalogJsonFromDbWithRedisLock(); } 123456789101112131415161718192021222324252627282930
Redisson分布式锁
Redisson 是架设在Redis 基础上的一个Java 驻内存数据网格(In-Memory Data Grid)。充分 的利用了Redis 键值数据库提供的一系列优势,基于Java 实用工具包中常用接口,为使用者 提供了一系列具有分布式特性的常用工具类。使得原本作为协调单机多线程并发程序的工 具包获得了协调分布式多机多线程并发系统的能力,大大降低了设计和研发大规模分布式 系统的难度。同时结合各富特色的分布式服务,更进一步简化了分布式环境中程序相互之间 的协作。
1. 导入依赖
<!-- 以后使用redisson作为分布式锁 分布式对象的框架 --> <dependency> <groupId>org.redisson</groupId> <artifactId>redisson</artifactId> <version>3.12.0</version> </dependency> 123456
2. 配置Redisson
程序化配置方法 Redisson程序化的配置方法是通过构建Config对象实例来实现
package cn.cloud.xmall.product.config; import org.redisson.Redisson; import org.redisson.api.RedissonClient; import org.redisson.config.Config; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.io.IOException; /** * @Description: Redisson配置类 * @author: Freedom * @QQ: 1556507698 * @date:2022/3/16 18:35 */ @Configuration public class MyRedissonConfig { /** * 所有对Redisson的使用都是通过RedissonClient * @return * @throws IOException */ @Bean(destroyMethod="shutdown") public RedissonClient redisson() throws IOException { //1、创建配置 Config config = new Config(); //指定使用单节点配置 config.useSingleServer().setAddress("redis://101.43.122.84:6379").setPassword("YourPassword"); //2、根据Config创建出RedissonClient实例 //Redis url should start with redis:// or rediss:// RedissonClient redissonClient = Redisson.create(config); return redissonClient; } } 123456789101112131415161718192021222324252627282930313233343536373839
3. 测试Redisson
基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.lock
的接口
myLock.lock(10,TimeUnit.SECONDS);
//10秒钟自动解锁,自动解锁时间一定要大于业务执行时间 问题:在锁时间到了以后,不会自动续期
@ResponseBody @GetMapping("/hello") public String hello(){ //1.获取一把锁,只要锁名字相同就是同一把锁 RLock lock = redisson.getLock("my-lock"); //2.加锁 也可以指定时间 lock.lock(); //阻塞式等待 加不到锁就会一直等 //1.如果我们 try{ System.out.println("加锁成功..执行业务...."+Thread.currentThread().getId()); Thread.sleep(30000); }catch (Exception e){ }finally { //解锁 lock.unlock(); System.out.println("释放锁"+Thread.currentThread().getId()); } return "hello"; }; 12345678910111213141516171819
-
问题:如果解锁代码没有运行,Redisson会不会出现死锁 没有出现死锁问题,因为Redisson内部有一个看门狗,它能够不断的对锁续期
Redisson看门狗原理
-
手动指定时间的方法
@Override public void lock(long leaseTime, TimeUnit unit) { try { lock(leaseTime, unit, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } 12345678
-
默认指定超时时间的方法
@Override public void lock() { try { lock(-1, null, false); } catch (InterruptedException e) { throw new IllegalStateException(); } } 12345678
2.默认默认指定超时时间又调用了
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException { //获取线程id long threadId = Thread.currentThread().getId(); //尝试来获取 Long ttl = tryAcquire(leaseTime, unit, threadId); // 如果尝试获取返回null 那么会认为锁获取到了 if (ttl == null) { //直接返回 return; } RFuture<RedissonLockEntry> future = subscribe(threadId); if (interruptibly) { commandExecutor.syncSubscriptionInterrupted(future); } else { commandExecutor.syncSubscription(future); } try { //获取不到锁会调用这个死循环一直获取 while (true) { ttl = tryAcquire(leaseTime, unit, threadId); // lock acquired if (ttl == null) { //直到获取到锁 break; } // waiting for message if (ttl >= 0) { try { future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { if (interruptibly) { throw e; } future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS); } } else { if (interruptibly) { future.getNow().getLatch().acquire(); } else { future.getNow().getLatch().acquireUninterruptibly(); } } } } finally { unsubscribe(future, threadId); } // get(lockAsync(leaseTime, unit)); } 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051
-
tryAcquire(long leaseTime, TimeUnit unit, long threadId) 上一个方法调用的获取锁的方法 1
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) { return get(tryAcquireAsync(leaseTime, unit, threadId)); } 123
-
tryAcquireAsync 方法
// leaseTime 我们传入的超时时间 private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) { //如果我们传入了超时时间 即 不等于 -1 注意:如果不传入超时时间的话就是-1 if (leaseTime != -1) { //尝试使用异步方式进行加锁 return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG); } RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG); ttlRemainingFuture.onComplete((ttlRemaining, e) -> { if (e != null) { return; } // lock acquired if (ttlRemaining == null) { scheduleExpirationRenewal(threadId); } }); return ttlRemainingFuture; } 1234567891011121314151617181920
-
tryLockInnerAsync 尝试尝试使用异步方式进行加锁
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) { //先将我们传入的时间转换为内部锁的释放时间 internalLockLeaseTime = unit.toMillis(leaseTime); return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command, "if (redis.call('exists', KEYS[1]) == 0) then " + "redis.call('hset', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " + "redis.call('hincrby', KEYS[1], ARGV[2], 1); " + "redis.call('pexpire', KEYS[1], ARGV[1]); " + "return nil; " + "end; " + "return redis.call('pttl', KEYS[1]);", Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId)); } 123456789101112131415161718
最佳实践使用指定超时时间的加锁方法,这样还省掉了续期时间 1
Redisson读写锁
-
保证一定能读到最新数据,修改期间,写锁是一个排它锁(互斥锁、独享锁),读锁是一个共享锁
-
写锁没释放读锁必须等待
-
读 + 读 :相当于无锁,并发读,只会在Redis中记录好,所有当前的读锁。他们都会同时加锁成功
-
写 + 读 :必须等待写锁释放
-
写 + 写 :阻塞方式
-
读 + 写 :有读锁。写也需要等待
-
只要有读或者写的存都必须等待
@GetMapping(value = "/write") @ResponseBody public String writeValue() { String s = ""; RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock"); RLock rLock = readWriteLock.writeLock(); try { //1、改数据加写锁,读数据加读锁 rLock.lock(); s = UUID.randomUUID().toString(); ValueOperations<String, String> ops = stringRedisTemplate.opsForValue(); ops.set("writeValue",s); TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } finally { rLock.unlock(); } return s; } @GetMapping(value = "/read") @ResponseBody public String readValue() { String s = ""; RReadWriteLock readWriteLock = redisson.getReadWriteLock("rw-lock"); //加读锁 RLock rLock = readWriteLock.readLock(); try { rLock.lock(); ValueOperations<String, String> ops = stringRedisTemplate.opsForValue(); s = ops.get("writeValue"); try { TimeUnit.SECONDS.sleep(10); } catch (InterruptedException e) { e.printStackTrace(); } } catch (Exception e) { e.printStackTrace(); } finally { rLock.unlock(); } return s; } 123456789101112131415161718192021222324252627282930313233343536373839404142
Redisson信号量
可以用来做限流
/** * 车库停车,走了一个,停一个车 * 3车位 * 信号量也可以做分布式限流 */ @GetMapping(value = "/park") @ResponseBody public String park() throws InterruptedException { //停车请求 RSemaphore park = redisson.getSemaphore("park"); park.acquire(); //获取一个信号、获取一个值,占一个车位 阻塞方法 boolean flag = park.tryAcquire(); //尝试获取 if (flag) { //执行业务 } else { return "error"; } return "ok=>" + flag; } @GetMapping(value = "/go") @ResponseBody public String go() { RSemaphore park = redisson.getSemaphore("park"); park.release(); //释放一个车位 return "ok"; } 12345678910111213141516171819202122232425262728
Redisson闭锁
/** * 放假、锁门 * 1班没人了 * 5个班,全部走完,我们才可以锁大门 即redis中存的5变成0 * 分布式闭锁 */ @GetMapping(value = "/lockDoor") @ResponseBody public String lockDoor() throws InterruptedException { RCountDownLatch door = redisson.getCountDownLatch("door"); door.trySetCount(5); //等待5个班的人 door.await(); //等待闭锁完成 return "放假了..."; } @GetMapping(value = "/gogogo/{id}") @ResponseBody public String gogogo(@PathVariable("id") Long id) { RCountDownLatch door = redisson.getCountDownLatch("door"); door.countDown(); //计数-1 return id + "班的人都走了..."; } 12345678910111213141516171819202122232425
缓存一致性解决方案!
缓存中的数据 如何和数据库保持一致?
-
双写模式 在更新数据库中的数据时,要同时修改缓存中的数据,但是可能会出现短时间的数据不一致
-
失效模式 在修改完数据库中的数据后 ,删除掉缓存中的数据,下次再查询就会主动查询数据库更新,但是在有些情况下还是会出现脏数据问题,注意 如果是需要经常修改,经常查询的数据,应该直接读数据库,可以考虑加读写锁,使用到缓存一般都是读多写少,所以用读写锁比较好
-
无论是双写模式还是失效模式,都会导致缓存的不一致问题。即多个实例同时更新会出事。怎么办?
-
1、如果是用户纬度数据(订单数据、用户数据),这种并发几率非常小,不用考虑这个问题,缓存数据加 上过期时间,每隔一段时间触发读的主动更新即可
-
2、如果是菜单,商品介绍等基础数据,也可以去使用canal订阅binlog的方式。
-
3、缓存数据+过期时间也足够解决大部分业务对于缓存的要求。
-
4、通过加锁保证并发读写,写写的时候按顺序排好队。读读无所谓。所以适合使用读写锁。(业务不关心 脏数据,允许临时脏数据可忽略);
-
总结: 1.我们能放入缓存的数据本就不应该是实时性、一致性要求超高的。所以缓存数据的时候加上过期时间,保 证每天拿到当前最新数据即可。 2.我们不应该过度设计,增加系统的复杂性 3.遇到实时性、一致性要求高的数据,就应该查数据库,即使慢点。
缓存一致性解决-Cannl
好处就是在编码期间只考虑修改数据库,Cannl在后台自己改,缺点就是增加中间件, Cannl还可以重组我们不同架构的数据