0
点赞
收藏
分享

微信扫一扫

【spring】自定义二级Cache

Spring Cache是Spring-context包中提供的基于注解方式使用的缓存组件,定义了一些标准接口,通过实现这些接口,就可以通过在方法上增加注解来实现缓存。这样就能够避免缓存代码与业务处理耦合在一起的问题。

Spring Cache核心的接口就两个:Cache和CacheManager。

  • Cache接口:该接口定义提供缓存的具体操作,比如缓存的放入、读取、清理:
  • CacheManager接口:主要提供Cache实现bean的创建,每个应用里可以通过cacheName来对Cache进行隔离,每个cacheName对应一个Cache实现。

有时候由于业务需要或者Spring提供的缓存不满足我们的要求,如无法解决缓存雪崩问题,无法实现多级缓存,扩展步骤如下:

  1. 实现CacheManager接口或继承AbstractCacheManager,管理自身的cache实例,也可以直接使用内置的SimpleCacheManager。
  2. 实现Cache接口,自定义缓存实现逻辑。
  3. 将自定义的Cache和CacheManager进行关联并注入到Spring容器中。

使用二级缓存需要思考的一些问题?

我们知道关系数据库(MyqSQL)数据最终存储在磁盘上,如果每次都从数据库里去读取,会因为磁盘本身的IO影响读取速度,所以就有了像redis这种的内存缓存。

通过内存缓存确实能够很大程度的提高查询速度,但如果同一查询并发量非常的大,频繁的查询redis,那么redis的网络IO会成为性能瓶颈。那我们针对这种查询非常频繁的数据(热点key),我们是不是可以考虑存到应用内缓存?。

当应用内缓存有符合条件的数据时,就可以直接使用,而不用通过网络到redis中去获取,这样就形成了两级缓存。

  • 一级缓存:应用内缓存,也叫本地缓存,可以使用caffeine、ehcache等实现。
  • 二级缓存:远程缓存,可以使用redis、memcache等实现。

整个流程如下

流程看着是很清晰,但其实二级缓存需要考虑的点还很多。

  1. 如何保证分布式节点一级缓存的一致性?
    我们说一级缓存是应用内缓存,那么当你的项目部署在多个节点的时候,如何保证当你对某个key进行修改删除操作时,使其它节点的一级缓存一致呢?
    可以使用MQ来实现,当key失效时,发送一条key失效的消息,节点收到这个消息时使本地的key失效即可,既然用了redis缓存,redis本身是有支持订阅/发布功能的,可以使用redis的发布订阅功能来实现,不必增加第三方组件来增加系统的复杂性。

  2. 是否允许存储空值?
    这个确实是需要考虑的点。因为如果某个查询缓存和数据库中都没有,那么就会导致频繁查询数据库,这也是我们常说的缓存穿透。

但如果存储空值呢,因为可能会存储大量的空值,导致缓存变大,所以这个最好是可配置,按照业务来决定是否开启。

  1. 是否需要缓存预热?
    也就是说,我们会觉得某些key一开始就会非常的热,也就是热点数据,那么我们是否可以一开始就先存储到缓存中,避免缓存击穿。

  2. 一级缓存存储数量上限的考虑?
    既然一级缓存是应用内缓存,那你是否考虑一级缓存存储的数据给个限定最大值,避免存储太多的一级缓存导致OOM。

Caffeine底层key支持弱引用,而value则支持弱引用和软引用。这样在OOM之前会将缓存先回收,回收完内存还不够才会OOM,当然默认是强引用。

  1. 一级缓存过期策略的考虑?
    redis作为二级缓存,redis是使用淘汰策略来管理的。具体可参考redis的8种淘汰策略。那么一级缓存的淘汰策略呢?

Caffeine有三种缓存值的清理策略:基于大小、基于时间和基于引用。

  • 基于容量:当缓存大小超过配置的大小限制时会发生回收。算法:Window TinyLfu。
  • 基于时间:写入后到期策略。访问后过期策略。到期时间由Expiry实现独自计算。
  • 基于引用:启用基于缓存键值的垃圾回收。

基于Spring Cache实现二级缓存(Caffeine+Redis)

二级缓存接口Level2Cache

package com.morris.spring.custom.cache;

import org.springframework.cache.Cache;

import java.util.concurrent.TimeUnit;

public interface Level2Cache extends Cache {
	void put(Object key, Object value, int expireTime, TimeUnit expireTimeUnit);
}

二级缓存管理接口Level2CacheManage

package com.morris.spring.custom.cache;

import org.springframework.cache.CacheManager;

public interface Level2CacheManage extends CacheManager {
	Level2Cache getCache(String name);
}

二级缓存抽象接口AbstractLevel2Cache

package com.morris.spring.custom.cache;

import org.springframework.cache.support.AbstractValueAdaptingCache;

public abstract class AbstractLevel2Cache extends AbstractValueAdaptingCache implements Level2Cache {

	protected AbstractLevel2Cache(boolean allowNullValues) {
		super(allowNullValues);
	}
}

二级缓存抽象管理接口AbstractLevel2CacheManage

package com.morris.spring.custom.cache;

import org.springframework.cache.Cache;
import org.springframework.cache.support.AbstractCacheManager;
import org.springframework.lang.Nullable;

import java.util.Collection;
import java.util.Collections;

public abstract class AbstractLevel2CacheManage extends AbstractCacheManager implements Level2CacheManage {

	@Override
	protected Collection<? extends Cache> loadCaches() {
		return Collections.emptyList();
	}

	@Override
	@Nullable
	public Level2Cache getCache(String name) {
		return (Level2Cache) super.getCache(name);
	}

	protected Level2Cache getMissingCache(String name) {
		return null;
	}
}

二级缓存实现类CaffeineRedisCache

package com.morris.spring.custom.cache;

import com.github.benmanes.caffeine.cache.Cache;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;

@SuppressWarnings("unchecked")
@Slf4j
public class CaffeineRedisCache extends AbstractLevel2Cache {

	private String name;

	private RedisTemplate redisTemplate;

	private Cache<Object, Object> caffeineCache;

	private String topic;

	public CaffeineRedisCache(String name, RedisTemplate redisTemplate, Cache<Object, Object> caffeineCache, String topic) {
		super(true);
		this.name = name;
		this.redisTemplate = redisTemplate;
		this.caffeineCache = caffeineCache;
		this.topic = topic;
	}

	@Override
	public String getName() {
		return this.name;
	}

	@Override
	public CaffeineRedisCache getNativeCache() {
		return this;
	}

	@Override
	public <T> T get(Object key, Callable<T> valueLoader) {

		// @Cacheable(sync=true)才会进入这里
		ValueWrapper result = get(key);

		if (result != null) {
			return (T) result.get();
		}

		// 这里可以考虑使用分布式锁
		synchronized (key.toString().intern()) {
			result = get(key);

			if (result != null) {
				return (T) result.get();
			}

			T value;
			try {
				value = valueLoader.call();
			} catch (Exception e) {
				throw new ValueRetrievalException(key, valueLoader, e);
			}

			put(key, value);
			return value;
		}
	}

	@Override
	public void put(Object key, Object value) {
		String cacheKey = getKey(key);
		CacheObj cacheObj = new CacheObj(key, value, -1);
		redisTemplate.opsForValue().set(cacheKey, cacheObj);
		// 这里发送了消息,会导致caffeineCache.put()进去的被clear,后续可在CacheMessage带上标识,自己忽略自己发送的CacheMessage
		push(new CacheMessage(this.name, key));
		caffeineCache.put(key, cacheObj);
	}

	@Override
	public void evict(Object key) {
		// 先清除redis中缓存数据,然后清除caffeine中的缓存,避免短时间内如果先清除caffeine缓存后其他请求会再从redis里加载到caffeine中
		String cacheKey = getKey(key);
		redisTemplate.delete(cacheKey);
		push(new CacheMessage(this.name, key));
		caffeineCache.invalidate(key);
	}

	@Override
	public void clear() {
		// 先清除redis中缓存数据,然后清除caffeine中的缓存,避免短时间内如果先清除caffeine缓存后其他请求会再从redis里加载到caffeine中
		Set<Object> keys = redisTemplate.keys(this.name.concat(":*"));
		for (Object key : keys) {
			redisTemplate.delete(key);
		}
		push(new CacheMessage(this.name, null));
		caffeineCache.invalidateAll();
	}

	@Override
	protected Object lookup(Object key) {
		Object cacheKey = getKey(key);

		CacheObj cacheObj  = (CacheObj) caffeineCache.getIfPresent(key);
		if(Objects.nonNull(cacheObj) && !cacheObj.keyIsExpired()) {
			log.debug("get cache from caffeine, the key is : {}", cacheKey);
			return cacheObj.getValue();
		}

		cacheObj = (CacheObj) redisTemplate.opsForValue().get(cacheKey);
		if(Objects.nonNull(cacheObj) && !cacheObj.keyIsExpired()) {
			log.debug("get cache from redis and put in caffeine, the key is : {}", cacheKey);
			caffeineCache.put(key, cacheObj);
			return cacheObj.getValue();
		}
		return null;
	}

	private String getKey(Object key) {
		return this.name.concat("::").concat(key.toString());
	}

	/**
	 * 缓存变更时通知其他节点清理本地缓存
	 *
	 * @param message
	 */
	private void push(CacheMessage message) {
		redisTemplate.convertAndSend(topic, message);
	}

	/**
	 * 清理本地缓存
	 *
	 * @param key
	 */
	public void clearLocal(Object key) {
		log.debug("clear local cache, the key is : {}", key);
		if (key == null) {
			caffeineCache.invalidateAll();
		} else {
			caffeineCache.invalidate(key);
		}
	}

	@Override
	public void put(Object key, Object value, int expireTime, TimeUnit expireTimeUnit) {
		String cacheKey = getKey(key);
		System.out.println(System.currentTimeMillis());
		long expireAt = System.currentTimeMillis() + expireTimeUnit.toMillis(expireTime);
		CacheObj cacheObj = new CacheObj(key, value, expireAt);
		caffeineCache.put(key, cacheObj);
		redisTemplate.opsForValue().set(cacheKey, cacheObj, expireTime, expireTimeUnit);
		push(new CacheMessage(this.name, key));
	}
}

二级缓存管理实现类CaffeineRedisCacheManage

package com.morris.spring.custom.cache;

import com.github.benmanes.caffeine.cache.Caffeine;
import org.springframework.cache.Cache;
import org.springframework.cache.support.AbstractCacheManager;
import org.springframework.data.redis.core.RedisTemplate;

import java.util.Collection;
import java.util.Collections;
import java.util.Objects;

public class CaffeineRedisCacheManage extends AbstractLevel2CacheManage {

	private RedisTemplate redisTemplate;

	private String topic;

	public CaffeineRedisCacheManage(RedisTemplate redisTemplate, String topic) {
		this.redisTemplate = redisTemplate;
		this.topic = topic;
	}

	@Override
	protected Level2Cache getMissingCache(String name) {
		return new CaffeineRedisCache(name, redisTemplate, caffeineCache(), topic);
	}

	private com.github.benmanes.caffeine.cache.Cache<Object, Object> caffeineCache() {
		Caffeine<Object, Object> cacheBuilder = Caffeine.newBuilder();
		cacheBuilder.initialCapacity(100);
		cacheBuilder.maximumSize(1_0000);
		// cacheBuilder.expireAfterAccess(60, TimeUnit.SECONDS);
		// cacheBuilder.expireAfterWrite(60, TimeUnit.SECONDS);
		// cacheBuilder.refreshAfterWrite(60, TimeUnit.SECONDS);
		return cacheBuilder.build();

	}

	public void clearLocal(String cacheName, Object key) {
		Cache cache = getCache(cacheName);
		if (Objects.isNull(cache)) {
			return;
		}

		CaffeineRedisCache redisCaffeineCache = (CaffeineRedisCache) cache;
		redisCaffeineCache.clearLocal(key);
	}
}

带超时时间缓存对象CacheObj

一级缓存和二级缓存中值存放的对象都是CacheObj对象,里面封装了具体的key、value和超时时间。

package com.morris.spring.custom.cache;

import lombok.Data;

import java.io.Serializable;
import java.util.Date;

/**
 * 超时时间缓存对象
 *
 * @author chenl346
 */
@Data
public class CacheObj<K, V> implements Serializable {
    private static final long serialVersionUID = 1L;

    protected K key;
    protected V value;
    protected long expireTime; // 缓存失效时间

    protected CacheObj() {
    }

    protected CacheObj(K key, V value, long expireTime) {
        this.key = key;
        this.value = value;
        this.expireTime = expireTime;
    }

    protected CacheObj(K key, V value, Date expireTime) {
        this.key = key;
        this.value = value;
        this.expireTime = expireTime.getTime();
    }

    /**
     * 判断key是否失效
     * @return
     */
    boolean keyIsExpired() {
		if(expireTime == -1) {
			return false;
		}
		System.out.println(expireTime + ":" + System.currentTimeMillis());
        return expireTime <= System.currentTimeMillis();
    }
}

缓存消息对象CacheMessage

key失效后,向redis中发送的消息对象。

package com.morris.spring.custom.cache;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;

/**
 * 缓存消息对象
 */
@Data
@AllArgsConstructor
@NoArgsConstructor
public class CacheMessage implements Serializable {

    private String cacheName;
    private Object key;

}

redis缓存失效监听器CacheMessageListener

package com.morris.spring.custom.cache;

import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.connection.Message;
import org.springframework.data.redis.connection.MessageListener;
import org.springframework.data.redis.core.RedisTemplate;

/**
 * 缓存失效监听器
 */
@Slf4j
public class CacheMessageListener implements MessageListener {

	private RedisTemplate<Object, Object> redisTemplate;
	
	private CaffeineRedisCacheManage caffeineRedisCacheManage;

	public CacheMessageListener(RedisTemplate<Object, Object> redisTemplate,
								CaffeineRedisCacheManage caffeineRedisCacheManage) {
		this.redisTemplate = redisTemplate;
		this.caffeineRedisCacheManage = caffeineRedisCacheManage;
	}

	@Override
	public void onMessage(Message message, byte[] pattern) {
		CacheMessage cacheMessage = (CacheMessage) redisTemplate.getValueSerializer().deserialize(message.getBody());
		log.info("receive a redis topic message, clear local cache, the cacheName is {}, the key is {}", cacheMessage.getCacheName(), cacheMessage.getKey());
		caffeineRedisCacheManage.clearLocal(cacheMessage.getCacheName(), cacheMessage.getKey());
	}

}

配置类

package com.morris.spring.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.JsonTypeInfo;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.impl.LaissezFaireSubTypeValidator;
import com.morris.spring.custom.cache.CacheMessageListener;
import com.morris.spring.custom.cache.CaffeineRedisCacheManage;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cache.annotation.EnableCaching;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.PropertySource;
import org.springframework.data.redis.connection.RedisStandaloneConfiguration;
import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.listener.ChannelTopic;
import org.springframework.data.redis.listener.RedisMessageListenerContainer;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@PropertySource("classpath:redis.properties")
@Configuration
@EnableCaching // 开启缓存注解
public class CustomCacheConfig {

	@Value("${redis.host}")
	private String hostName;

	@Value("${redis.port}")
	private Integer port;

	@Value("${redis.password}")
	private String password;

	private String topic = "cache:redis:caffeine:topic";

	@Bean
	public RedisTemplate redisTemplate() {
		RedisTemplate<Object, Object> template = new RedisTemplate<>();
		template.setConnectionFactory(jedisConnectionFactory());
		//使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值
		Jackson2JsonRedisSerializer<Object> serializer = new Jackson2JsonRedisSerializer<>(Object.class);
		ObjectMapper mapper = new ObjectMapper();
		mapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
		// 序列化对象中带上class信息
		mapper.activateDefaultTyping(LaissezFaireSubTypeValidator.instance, ObjectMapper.DefaultTyping.NON_FINAL, JsonTypeInfo.As.WRAPPER_ARRAY);
		serializer.setObjectMapper(mapper);
		template.setValueSerializer(serializer);
		//使用StringRedisSerializer来序列化和反序列化redis的key值
		template.setKeySerializer(new StringRedisSerializer());
		template.setHashValueSerializer(serializer);
		template.setHashKeySerializer(new StringRedisSerializer());
		return template;
	}

	@Bean
	public CaffeineRedisCacheManage caffeineRedisCacheManage() {
		return new CaffeineRedisCacheManage(redisTemplate(), topic);
	}

	@Bean
	public RedisMessageListenerContainer redisMessageListenerContainer(RedisTemplate<Object, Object> redisTemplate) {
		RedisMessageListenerContainer redisMessageListenerContainer = new RedisMessageListenerContainer();
		redisMessageListenerContainer.setConnectionFactory(jedisConnectionFactory());
		CacheMessageListener cacheMessageListener = new CacheMessageListener(redisTemplate, caffeineRedisCacheManage());
		redisMessageListenerContainer.addMessageListener(cacheMessageListener, new ChannelTopic(topic));
		return redisMessageListenerContainer;
	}

	@Bean
	public JedisConnectionFactory jedisConnectionFactory() {
		RedisStandaloneConfiguration redisStandaloneConfiguration = new RedisStandaloneConfiguration(hostName, port);
		redisStandaloneConfiguration.setPassword(password);
		JedisConnectionFactory jedisConnectionFactory = new JedisConnectionFactory(redisStandaloneConfiguration);
		return jedisConnectionFactory;
	}
}

总结

  • 这里扩展了Cache和CacheManage接口,只是为后面自定义缓存注解做准备,这里实际上是不需要的,用不到。
  • 一级缓存和二级缓存中存的是CacheObj对象,也是为后面自定义缓存注解做准备,能自定义缓存失效时间,这里实际上是不需要的,用不到。
  • 可在此基础上再次封装,如caffeine的一些配置,redis失效时channel,封装为starter,参数基于yaml配置。

缺点:

  • 调用cache.put()时,发送了消息CacheMessage,然后自己收到了CacheMessage,会导致将cache.put()进去的被clear,后续可在CacheMessage带上标识,忽略自己发送的CacheMessage
  • cache.get(Object key, Callable valueLoader)暂时使用synchronized来解决缓存击穿,后续可使用redis的分布式锁来实现。

开源实现

  1. 阿里巴巴jetcache: https://github.com/alibaba/jetcache
  2. J2Cache: https://gitee.com/ld/J2Cache
  3. l2cache: https://github.com/ck-jesse/l2cache
举报

相关推荐

0 条评论