读隔离
在数据库本地事务隔离级别读已提交(Read Committed) 或以上的基础上,
Seata(AT 模式)的默认全局隔离级别是 读未提交(Read Uncommitted) 。
因为分支事务在阶段一就已经提交了,如果其他分支事务还未提交,那么从已提交事务的数据库读取数据能看到更新后的数据,因为此时全局事务还未全部提交,所以是读未提交
如果应用在特定场景下,必需要求全局的 读已提交 ,
目前 Seata 的方式是通过 SELECT FOR UPDATE 语句的代理。
ExecuteTemplate
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) { //PlainExecutor 是 jdbc 原始执行器, 不包含 Seata 的逻辑
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else { // 根据SQL的类型, 调用不同的Executor
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
break;
default: // SELECT 语句会直接用原生的 PlainExecutor 执行
executor = new PlainExecutor<>(statementProxy, statementCallback); // 读未提交
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {
rs = executor.execute(args);
}
Executor
public interface Executor<T> {
/**
* Execute t.
* @param args the args
* @return the t
* @throws Throwable the throwable
*/
T execute(Object... args) throws Throwable;
}
AbstractDMLBaseExecutor
InsertExecutor
继承图
接口
public interface InsertExecutor<T> extends Executor<T> {
/**
* get primary key values.
*
* @return The primary key value.
* @throws SQLException the sql exception
*/
Map<String, List<Object>> getPkValues() throws SQLException;
/**
* get primary key values by insert column.
*
* @return pk values by column
* @throws SQLException the sql exception
*/
Map<String, List<Object>> getPkValuesByColumn() throws SQLException;
}
BaseInsertExecutor
类图
有3个不同数据库的子类
MySQLInsertExecutor、OracleInsertExecutor、PostgresqlInsertExecutor
beforeImage
实现逻辑在BaseInsertExecutor
protected TableRecords beforeImage() throws SQLException {
return TableRecords.empty(getTableMeta());
}
afterImage
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
Map<String, List<Object>> pkValues = getPkValues();
TableRecords afterImage = buildTableRecords(pkValues);
if (afterImage == null) {
throw new SQLException("Failed to build after-image for insert");
}
return afterImage;
}
UpdateExecutor
beforeImage
需要加锁FOR UPDATE
protected TableRecords beforeImage() throws SQLException {
ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
TableMeta tmeta = getTableMeta();
String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
return buildTableRecords(tmeta, selectSQL, paramAppenderList);
}
private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer) sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuilder prefix = new StringBuilder("SELECT ");
StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
if (StringUtils.isNotBlank(whereCondition)) {
suffix.append(WHERE).append(whereCondition);
}
String orderBy = recognizer.getOrderBy();
if (StringUtils.isNotBlank(orderBy)) {
suffix.append(orderBy);
}
ParametersHolder parametersHolder = statementProxy instanceof ParametersHolder ? (ParametersHolder)statementProxy : null;
String limit = recognizer.getLimit(parametersHolder, paramAppenderList);
if (StringUtils.isNotBlank(limit)) {
suffix.append(limit);
}
suffix.append(" FOR UPDATE"); //加锁
StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
if (ONLY_CARE_UPDATE_COLUMNS) {
if (!containsPK(updateColumns)) {
selectSQLJoin.add(getColumnNamesInSQL(tableMeta.getEscapePkNameList(getDbType())));
}
for (String columnName : updateColumns) {
selectSQLJoin.add(columnName);
}
} else {
for (String columnName : tableMeta.getAllColumns().keySet()) {
selectSQLJoin.add(ColumnUtils.addEscape(columnName, getDbType()));
}
}
return selectSQLJoin.toString();
}
afterImage
不用FOR UPDATE,因为before的时候,已经for update了,是同个事务
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
TableMeta tmeta = getTableMeta();
if (beforeImage == null || beforeImage.size() == 0) {
return TableRecords.empty(getTableMeta());
}
String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
ResultSet rs = null;
try (PreparedStatement pst = statementProxy.getConnection().prepareStatement(selectSQL)) {
SqlGenerateUtils.setParamForPk(beforeImage.pkRows(), getTableMeta().getPrimaryKeyOnlyName(), pst);
rs = pst.executeQuery();
return TableRecords.buildRecords(tmeta, rs);
} finally {
IOUtil.close(rs);
}
}
DeleteExecutor
beforeImage
protected TableRecords beforeImage() throws SQLException {
SQLDeleteRecognizer visitor = (SQLDeleteRecognizer) sqlRecognizer;
TableMeta tmeta = getTableMeta(visitor.getTableName());
ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
String selectSQL = buildBeforeImageSQL(visitor, tmeta, paramAppenderList);
return buildTableRecords(tmeta, selectSQL, paramAppenderList);
}
private String buildBeforeImageSQL(SQLDeleteRecognizer visitor, TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
String whereCondition = buildWhereCondition(visitor, paramAppenderList);
StringBuilder suffix = new StringBuilder(" FROM ").append(getFromTableInSQL());
if (StringUtils.isNotBlank(whereCondition)) {
suffix.append(WHERE).append(whereCondition);
}
String orderBy = visitor.getOrderBy();
if (StringUtils.isNotBlank(orderBy)) {
suffix.append(orderBy);
}
ParametersHolder parametersHolder = statementProxy instanceof ParametersHolder ? (ParametersHolder)statementProxy : null;
String limit = visitor.getLimit(parametersHolder, paramAppenderList);
if (StringUtils.isNotBlank(limit)) {
suffix.append(limit);
}
suffix.append(" FOR UPDATE"); // 注意这里,加锁了
StringJoiner selectSQLAppender = new StringJoiner(", ", "SELECT ", suffix.toString());
for (String column : tableMeta.getAllColumns().keySet()) {
selectSQLAppender.add(getColumnNameInSQL(ColumnUtils.addEscape(column, getDbType())));
}
return selectSQLAppender.toString();
}
afterImage
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
return TableRecords.empty(getTableMeta());
}
MultiExecutor
beforeImage
update: MultiUpdateExecutor
delete: MultiDeleteExecutor
protected TableRecords beforeImage() throws SQLException {
//group by sqlType
multiSqlGroup = sqlRecognizers.stream().collect(Collectors.groupingBy(t -> t.getTableName()));
AbstractDMLBaseExecutor<T, S> executor = null;
for (List<SQLRecognizer> value : multiSqlGroup.values()) {
switch (value.get(0).getSQLType()) {
case UPDATE:
executor = new MultiUpdateExecutor<T, S>(statementProxy, statementCallback, value);
break;
case DELETE:
executor = new MultiDeleteExecutor<T, S>(statementProxy, statementCallback, value);
break;
default:
throw new UnsupportedOperationException("not support sql" + value.get(0).getOriginalSQL());
}
TableRecords beforeImage = executor.beforeImage();
beforeImagesMap.put(value.get(0), beforeImage);
}
return null;
}
afterImage
update: MultiUpdateExecutor
delete: MultiDeleteExecutor
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
AbstractDMLBaseExecutor<T, S> executor = null;
for (List<SQLRecognizer> value : multiSqlGroup.values()) {
switch (value.get(0).getSQLType()) {
case UPDATE:
executor = new MultiUpdateExecutor<T, S>(statementProxy, statementCallback, value);
break;
case DELETE:
executor = new MultiDeleteExecutor<T, S>(statementProxy, statementCallback, value);
break;
default:
throw new UnsupportedOperationException("not support sql" + value.get(0).getOriginalSQL());
}
beforeImage = beforeImagesMap.get(value.get(0));
TableRecords afterImage = executor.afterImage(beforeImage);
afterImagesMap.put(value.get(0), afterImage);
}
return null;
}