缓存需求分析
SqlSession中可以看到Cache链
二级缓存命中
由于二级缓存时跨线程的,如果不是 提交之后才能命中会导致脏读,比如上面图示,会话2,发生修改之后直接直接填充到二级缓存中,导致此时会话1正好读取到,但是回话2由于某些原因回滚了,此时会话一读取了错误数据
上图描述了mybatis的二级缓存工作原理:查询时如果存在就直接获取不存从库中获取
修改时直接清理暂存区,提交时提交的暂存区,此时我们看看源码如何实现的。
查询
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler) throws SQLException {
BoundSql boundSql = ms.getBoundSql(parameterObject);
//获取缓存key 和 一级缓存规则一样,为啥这么说,底层调用的同一个方法
CacheKey key = createCacheKey(ms, parameterObject, rowBounds, boundSql);
return query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
@Override
public <E> List<E> query(MappedStatement ms, Object parameterObject, RowBounds rowBounds, ResultHandler resultHandler, CacheKey key, BoundSql boundSql)
throws SQLException {
Cache cache = ms.getCache();
//一个cache 表示一个命名空间
if (cache != null) { //开启二级缓存
//清理暂存缓存区数据
flushCacheIfRequired(ms);
if (ms.isUseCache() && resultHandler == null) {
ensureNoOutParams(ms, boundSql);
@SuppressWarnings("unchecked")
List<E> list = (List<E>) tcm.getObject(cache, key);
if (list == null) {
//从二级缓存没有获取到直接执行BaseExecutor中的查询,从一级获取或者DB中获取
list = delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
//存放到缓存暂存区
tcm.putObject(cache, key, list); // issue #578 and #116
}
return list;
}
}
//没有开启二级缓存直接到 baseExecutor
return delegate.query(ms, parameterObject, rowBounds, resultHandler, key, boundSql);
}
@Override
public Object getObject(Object key) {
// issue #116 从二级缓存中获取数据
Object object = delegate.getObject(key);
if (object == null) { //防穿透
entriesMissedInCache.add(key);
}
// issue #146
if (clearOnCommit) { //标记缓存清空,这个标记有会话进行修改操作时,还没提交之前并没有清空二级缓存,
return null;
} else {
return object;
}
}
此时从二级缓存中获取数据,为什么即便能获取到数据,如果clearOnCommit 为true时也返回一个空,这里就和上面说的每次查询,只有提交,二级缓存才起作用的原因。
原因还是为了线程安全,避免读取脏数据:查询到时候问一下暂存区,这个数据是不是在提交时需要清理掉,如果需要清理掉我就不要了,直接去db去取了,
修改
org.apache.ibatis.executor.CachingExecutor#update
@Override
public int update(MappedStatement ms, Object parameterObject) throws SQLException {
// 清理二级缓存
flushCacheIfRequired(ms);
//baseExecutor 执行update
return delegate.update(ms, parameterObject);
}
private void flushCacheIfRequired(MappedStatement ms) {
Cache cache = ms.getCache();
if (cache != null && ms.isFlushCacheRequired()) {
//清理暂存区
tcm.clear(cache);
}
}
提交
org.apache.ibatis.executor.CachingExecutor#commit
@Override
public void commit(boolean required) throws SQLException {
// baseExecutor 正常的提交
delegate.commit(required);
// 暂存区提交
tcm.commit();
}
org.apache.ibatis.cache.decorators.TransactionalCache#commit
public void commit() {
// clearOnCommit 发生了修改之类的操作需要清除二级缓存
if (clearOnCommit) {
delegate.clear();
}
//将暂存区的缓存提交到二级缓存中
flushPendingEntries();
//暂存区恢复默认状态
reset();
}
下面是几个自己比较感兴趣的类 注释写了一下贴出来了
暂存区类 TransactionalCache 相当于一个命名空间一个暂存区
public class TransactionalCache implements Cache {
private static final Log log = LogFactory.getLog(TransactionalCache.class);
//责任连下一个结点处理
private final Cache delegate;
//暂存取的标志,表示是否需要在提交时清除
private boolean clearOnCommit;
//二级缓存中的数据
private final Map<Object, Object> entriesToAddOnCommit;
//存放未命中的key
private final Set<Object> entriesMissedInCache;
public TransactionalCache(Cache delegate) {
this.delegate = delegate;
this.clearOnCommit = false;
this.entriesToAddOnCommit = new HashMap<>();
this.entriesMissedInCache = new HashSet<>();
}
@Override
public String getId() {
return delegate.getId();
}
@Override
public int getSize() {
return delegate.getSize();
}
@Override
public Object getObject(Object key) {
// issue #116 从二级缓存中获取数据
Object object = delegate.getObject(key);
if (object == null) { //防穿透
entriesMissedInCache.add(key);
}
// issue #146
if (clearOnCommit) { //标记缓存清空,这个标记有会话进行修改操作时,还没提交之前并没有清空二级缓存,
return null;
} else {
return object;
}
}
@Override
public void putObject(Object key, Object object) {
entriesToAddOnCommit.put(key, object);
}
@Override
public Object removeObject(Object key) {
return null;
}
@Override
public void clear() {
clearOnCommit = true;
entriesToAddOnCommit.clear();
}
public void commit() {
// clearOnCommit 发生了修改之类的操作需要清除二级缓存
if (clearOnCommit) {
delegate.clear();
}
//将暂存区的缓存提交到二级缓存中
flushPendingEntries();
//暂存区恢复默认状态
reset();
}
public void rollback() {
unlockMissedEntries();
reset();
}
private void reset() {
clearOnCommit = false;
entriesToAddOnCommit.clear();
entriesMissedInCache.clear();
}
//将缓存区的数据存放到二级缓存中,包括未命中的数据
private void flushPendingEntries() {
for (Map.Entry<Object, Object> entry : entriesToAddOnCommit.entrySet()) {
delegate.putObject(entry.getKey(), entry.getValue());
}
for (Object entry : entriesMissedInCache) {
if (!entriesToAddOnCommit.containsKey(entry)) {
delegate.putObject(entry, null);
}
}
}
//清除掉二级缓存中未命中的数据
private void unlockMissedEntries() {
for (Object entry : entriesMissedInCache) {
try {
delegate.removeObject(entry);
} catch (Exception e) {
log.warn("Unexpected exception while notifying a rollback to the cache adapter. "
+ "Consider upgrading your cache adapter to the latest version. Cause: " + e);
}
}
}
}
暂存区会存在很多个提供了一个管理类:
public class TransactionalCacheManager {
private final Map<Cache, TransactionalCache> transactionalCaches = new HashMap<>();
public void clear(Cache cache) {
getTransactionalCache(cache).clear();
}
public Object getObject(Cache cache, CacheKey key) {
//先根据cache获取
return getTransactionalCache(cache).getObject(key);
}
public void putObject(Cache cache, CacheKey key, Object value) {
getTransactionalCache(cache).putObject(key, value);
}
public void commit() {
for (TransactionalCache txCache : transactionalCaches.values()) {
txCache.commit();
}
}
public void rollback() {
for (TransactionalCache txCache : transactionalCaches.values()) {
txCache.rollback();
}
}
private TransactionalCache getTransactionalCache(Cache cache) {
return MapUtil.computeIfAbsent(transactionalCaches, cache, TransactionalCache::new);
}
}
目前对二级缓存还有遗留了一个问题 缓存链是如何装配起来的,这部分放到mybatis环境初始化的时候再看