Seata AT 分支事务

引言

前面,我们已经介绍了 Seata 的整体设计思想,接下来我们深入到其实现细节中,本文介绍 Seata 中 AT 模式分支事务的实现,其他 Seata 相关文章均收录于 <Seata系列文章>中。

AT 模式

前面在介绍 Seata 入口时, 大家可能会注意到 GlobalTransactionScanner 中还存在一个数据源的代理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 替换默认的数据库连接源, 改为 AT 模式的数据源代理
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof DataSource && !(bean instanceof DataSourceProxy) && ConfigurationFactory.getInstance().getBoolean(DATASOURCE_AUTOPROXY, false)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Auto proxy of [" + beanName + "]");
}
DataSourceProxy dataSourceProxy = DataSourceProxyHolder.get().putDataSource((DataSource) bean);
return Enhancer.create(bean.getClass(), (org.springframework.cglib.proxy.MethodInterceptor) (o, method, args, methodProxy) -> {
Method m = BeanUtils.findDeclaredMethod(DataSourceProxy.class, method.getName(), method.getParameterTypes());
if (null != m) {
return m.invoke(dataSourceProxy, args);
} else {
return method.invoke(bean, args);
}
});
}
return bean;
}

我们知道, AT 模式对业务是无侵入的, 而它之所以能做到这种效果, 就是通过代理数据源, 在 SQL 执行结束但还没有提交的时候, 它会分析 SQL 的执行效果, 获取到它会使用到哪些行, 然后将这些使用到的行主键作为 key, 向 TC 申请全局写锁, 如果成功获得了锁, 才进行提交, 否则回滚。看起来好像很简单, 但是实际做起来还是挺难的, 有几个现实的问题: 怎么觉察到这个 SQL 执行在全局事务中? 所有 SQL 操作都需要嵌入 Seata 的逻辑么? 怎么知道该 SQL 都需要修改哪些行的? 当全局事务回滚时, 它又是怎么恢复回去的呢 ?

带着这些问题, 我们从 DataSourceProxy 开始, 一窥 AT 模式的实现内核。熟悉 DataSource 接口的同学可能知道, 实际上 SQL 的执行并不在该接口中, 在 DataSource 中我们能得到数据库 Connection, 而在 Connection 我们又能得到 Statement, 而这个 Statement 接口才是真正执行 SQL 的地方。所以, 我就不给大家展示这个嵌套的调用过程了, 我们直接进去看 Seata 如何实现 Statement 接口, 毕竟这里才是 SQL 执行的核心。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class StatementProxy<T extends Statement> extends AbstractStatementProxy<T> {
// ...
@Override
public ResultSet executeQuery(String sql) throws SQLException {
this.targetSQL = sql;
return ExecuteTemplate.execute(this, new StatementCallback<ResultSet, T>() {
@Override
public ResultSet execute(Statement statement, Object... args) throws SQLException {
return statement.executeQuery((String) args[0]);
}
}, sql);
}
// ...
}

我们可以看到 Seata 并没有在 StatementProxy 写实质性的内容, 而是将工作委托给 ExecuteTemplate, 它们可真爱用模板方法模式, 好吧, 我们去看看 ExecuteTemplate 都干了啥:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
public class ExecuteTemplate {
public static <T, S extends Statement> T execute(SQLRecognizer sqlRecognizer,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
// RootContext ThreadLocal 中存了 xid 和是否需要 global lock, 前面介绍拦截器的时候, 我并没有展示 GlobalLock 拦截器的代码,
// 实际上它就是修改了一下标志位, 具体需不需要向 TC 确认锁, 是在 SQL 执行的时候才知道
// 从这里我们可以看出, 只有当存在全局事务, 或者需要全局锁的时候, 才会加入 Seata 的流程, 否则用默认的 statement 直接执行
if (!RootContext.inGlobalTransaction() && !RootContext.requireGlobalLock()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
// 根据 db 的不同, 获取不同的分析器, Mysql Oracle
if (sqlRecognizer == null) {
sqlRecognizer = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
statementProxy.getConnectionProxy().getDbType());
}
Executor<T> executor = null;
if (sqlRecognizer == null) {
// PlainExecutor 是 jdbc 原始执行器, 不包含 Seata 的逻辑
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
} else {
// 分析出 SQL 的类型, 调用不同的执行器, 这里我们会发现 Select 语句会直接用原生的 PlainExecutor 执行
// 也正是如此才说 Seata 默认执行在读未提交的隔离级别下(直接 select 查询, 并不会找 TC 确认锁的情况), 正如前面说的, 读已提交隔离级别是通过 SELECT_FOR_UPDATE + GlobalLock 联合来实现
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = new InsertExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case UPDATE:
executor = new UpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case DELETE:
executor = new DeleteExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
case SELECT_FOR_UPDATE:
executor = new SelectForUpdateExecutor<T, S>(statementProxy, statementCallback, sqlRecognizer);
break;
default:
executor = new PlainExecutor<T, S>(statementProxy, statementCallback);
break;
}
}
T rs = null;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException)ex;
}
return rs;
}
}

到 ExecuteTemplate 这一层终于开始干实事了:

  1. 判断是不是在全局事务中, 是不是有全局锁的需求
  2. 分析 SQL, 只代理 Create, Update, Delete 过程和 SelectForUpdate
  3. 调用最终采用的执行器

在 ExecuteTemplate 有两个难点, 一个是 SQL 分析器, 另一个是 SQL 执行器, 分析器就是根据不同数据库 SQL 的关键字构建抽象语法树, 然后得出该 SQL 是什么类型的, 因为这部分大部分是语法分析, 比较繁杂, 而且 Seata 也是调用了另一个库(druid)提供的功能, 我们这里就不展开介绍了。我们着重看一下 Seata 通过分析结果, 怎么做到无侵入的。

1
2
3
4
5
6
7
8
// AbstractDMLBaseExecutor
protected T executeAutoCommitFalse(Object[] args) throws Exception {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
}

这里先看一下所有 SQL 执行器的基类 AbstractDMLBaseExecutor 的工作流程:

  1. 获取执行前快照
  2. 执行原始 SQL
  3. 获取执行后快照
  4. 准备回滚日志

有一个需要注意的点是,上述的所有过程要在一个本地事务中完成,如果本地事务默认是自动提交的话,Seata 会先将其改为不自动提交,再开始上述过程。无论是 Create, Update, 还是 Delete, 都是走这一个流程出来的, 它们的不同点就在于 beforeImage 和 afterImage 的实现。我们来看看它们三个有啥不同:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Create
@Override
protected TableRecords beforeImage() throws SQLException {
return TableRecords.empty(getTableMeta());
}

@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
//Pk column exists or PK is just auto generated
List<Object> pkValues = containsPK() ? getPkValuesByColumn() :
(containsColumns() ? getPkValuesByAuto() : getPkValuesByColumn());

TableRecords afterImage = buildTableRecords(pkValues);

if (afterImage == null) {
throw new SQLException("Failed to build after-image for insert");
}

return afterImage;
}

可以看到, Create 过程的 beforeImage 是空, afterImage 是先获取主键列表, 然后 buildTableRecords 构建查询 select * form table where pk in (pk list)。而获取主键的方式,这里分两种情况:

  1. 主键在业务 SQL 的数据中
    • 查询该表的 meta 信息, 得到 pk 的列名, 然后和业务 SQL 比对, 看看业务 SQL 中包不包含 pk, 如果包含则直接拿出来
  2. 主键是自动生成的
    • 插入操作执行完成后, 是可以指定让其返回主键的, 如果执行 SQL 时没有指定返回主键,Seata 会使用 SELECT LAST_INSERT_ID(), 但是这其实并不可靠, 数据可能被其他插入过程污染, 所以当这种情况发生时, Seata 会打印一条警告

      Fail to get auto-generated keys, use SELECT LAST_INSERT_ID() instead. Be cautious, statement could be polluted. Recommend you set the statement to return generated keys.

InsertExecutor 的核心内容就是这些, 我们再看看 UpdateExecutor:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// Update
@Override
protected TableRecords beforeImage() throws SQLException {

ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
TableMeta tmeta = getTableMeta();
String selectSQL = buildBeforeImageSQL(tmeta, paramAppenderList);
return buildTableRecords(tmeta, selectSQL, paramAppenderList);
}
// 构建 BeforeImage 的 SQL
private String buildBeforeImageSQL(TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuilder prefix = new StringBuilder("SELECT ");
// 更新的列中不包含主键, 就将主键也加入进去
if (!tableMeta.containsPK(updateColumns)) {
prefix.append(getColumnNameInSQL(tableMeta.getPkName()) + ", ");
}
// 至此 SQL 前缀构建完成, 开始构建 SQL 后缀
StringBuilder suffix = new StringBuilder(" FROM " + getFromTableInSQL());
String whereCondition = buildWhereCondition(recognizer, paramAppenderList);
// 基于原始 SQL 中的查询条件
if (StringUtils.isNotBlank(whereCondition)) {
suffix.append(" WHERE " + whereCondition);
}
// 本地事务加锁, 防止 MVCC 导致的快照数据和实际修改数据条目不一致, 后面我会详细解释
suffix.append(" FOR UPDATE");
StringJoiner selectSQLJoin = new StringJoiner(", ", prefix.toString(), suffix.toString());
// 将修改的列加入 SQL, 这里只会拿修改的列 + 主键作为 Image, 减少数据规模
for (String updateColumn : updateColumns) {
selectSQLJoin.add(updateColumn);
}
return selectSQLJoin.toString();
}

@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
TableMeta tmeta = getTableMeta();
// 如果 before 快照是空, 那么 after 快照也是空
if (beforeImage == null || beforeImage.size() == 0) {
return TableRecords.empty(getTableMeta());
}
String selectSQL = buildAfterImageSQL(tmeta, beforeImage);
TableRecords afterImage = null;
PreparedStatement pst = null;
ResultSet rs = null;
// 通过 before 快照的所有 pk
try {
// 构建只通过 pk 所谓搜索条件的 SQL, 查询 pk + 更新的所有列
pst = statementProxy.getConnection().prepareStatement(selectSQL);
int index = 0;
// 将 before image 中的 pk 插入 SQL
for (Field pkField : beforeImage.pkRows()) {
index++;
pst.setObject(index, pkField.getValue(), pkField.getType());
}
rs = pst.executeQuery();
// 查询出 after image
afterImage = TableRecords.buildRecords(tmeta, rs);

} finally {
if (rs != null) {
rs.close();
}
if (pst != null) {
pst.close();
}
}
return afterImage;
}
// 构建 After Image 的 SQL
private String buildAfterImageSQL(TableMeta tableMeta, TableRecords beforeImage) throws SQLException {
SQLUpdateRecognizer recognizer = (SQLUpdateRecognizer)sqlRecognizer;
List<String> updateColumns = recognizer.getUpdateColumns();
StringBuilder prefix = new StringBuilder("SELECT ");
if (!tableMeta.containsPK(updateColumns)) {
// PK should be included.
prefix.append(getColumnNameInSQL(tableMeta.getPkName()) + ", ");
}
String suffix = " FROM " + getFromTableInSQL() + " WHERE " + buildWhereConditionByPKs(beforeImage.pkRows());
StringJoiner selectSQLJoiner = new StringJoiner(", ", prefix.toString(), suffix);
for (String column : updateColumns) {
selectSQLJoiner.add(column);
}
return selectSQLJoiner.toString();
}

我们可以看到 UpdateExecutor 复杂了很多, 不像 InsertExecutor 直接有拿主键的方式, UpdateExecutor 就得自己把主键查出来, 总结一下:

  1. before image 的查询 SQL 需要保证查询的内容包含更新的所有列 + PK
  2. 从业务 SQL 中提取更新条件, 作为快照的查询条件
  3. 对所有要更新的数据加悲观锁 Select for update
  4. 查出 before image 后, 执行业务 SQL
  5. 通过 before image 查询出来的主键, 构建 after image 的查询条件
  6. 查询 after image

这里有两个需要注意的点, 一个是 after image 的查询直接使用 before image 查出的 pk, 这样保证能用到 pk 索引, 查询更快, 为什么这么说: 因为业务 sql 可能查询条件并不一定用到索引, 那么它的效率可能就比较差, 而我们在 before image 的构建过程中, 直接就拿到了所有 pk, 直接通过 pk 查询, 因为能用到主键索引, 效率很高, 所以在构建 after image 的时候并没有像 before image 一样通过业务 SQL 来构建查询条件, 我觉得这挺巧妙的。

其次, 还有一点, 就是为什么构建 before image 的过程要加锁, 我们可以看一下如下的两个 MySQL 并发事务:

1
2
3
4
5
6
7
8
9
10
11
### 事务 1
# 1 构建 before 快照
select * from t_account where user_id >= 1;
# 3 真正的更新命令
update t_account set t_account.amount = 1000 where user_id >= 1;
# 4 构建 after 快照, 因为 before 快照只查出 user_id = 1 的数据, 所以 sql 如下
select * from t_account where user_id = 1;

### 事务 2
# 2 插入一条数据
insert into t_account (user_id, amount) values (2, 2000);

每条 SQL 命令的前面有一个标号, 该标号表示该条 SQL 的执行顺序, 如果两个并发事务按照如上顺序执行的话, 会造成什么问题呢? 我来给大家演示一下:

  1. 事务 1-1: select * from t_account where user_id >= 1;
    • t1-1
  2. 事务 2:insert into t_account (user_id, amount) values (2, 2000);
    • 执行成功
  3. 事务 1-2: update t_account set t_account.amount = 1000 where user_id >= 1;
    • 执行成功
  4. 事务 1-3: select * from t_account where user_id = 1;
    • t1-2

看着好像没啥问题啊, 但是我们看一看事务 1 提交后, DB 的状况是什么样:
t1-commit
发现问题了没, 事务 2 插入的内容也被事务 1 改写了, 但是 before image 和 after image 都没体现出来, 这样在回滚的时候就会漏数据, 为什么会这样呢? 这其实是因为 MySQL 的默认隔离级别是可重复读, 在这种隔离级别下, 由于 MVCC 的机制, 读过程确实可以保证重复不变(前提自己不能修改新加入的数据), 但是不保证以相同的筛选条件进行更新时, 更新到的数据仅是您刚才读到的数据, 它会更新所有数据,甚至包括未提交的数据。那么怎么解决呢?有两个办法, 将事务隔离界别设为 serializable, 或者显式地加悲观锁 select for update, 可能因为各个数据库的隔离级别定义可能不一致, 为了消除兼容性问题, Seata 选择了后者。在读取 before image 时加悲观锁, 会导致事务 2 被阻塞, 直到事务 1 提交, 这样就万事平安了。

然后我们看一看 DeleteExecutor 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
@Override
protected TableRecords beforeImage() throws SQLException {
SQLDeleteRecognizer visitor = (SQLDeleteRecognizer) sqlRecognizer;
TableMeta tmeta = getTableMeta(visitor.getTableName());
ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
String selectSQL = buildBeforeImageSQL(visitor, tmeta, paramAppenderList);
return buildTableRecords(tmeta, selectSQL, paramAppenderList);
}
// 构建 before image 的 SQL
private String buildBeforeImageSQL(SQLDeleteRecognizer visitor, TableMeta tableMeta, ArrayList<List<Object>> paramAppenderList) {
KeywordChecker keywordChecker = KeywordCheckerFactory.getKeywordChecker(JdbcConstants.MYSQL);
String whereCondition = buildWhereCondition(visitor, paramAppenderList);
// 构建删选条件的方式和 UpdateExecutor 一样
StringBuilder suffix = new StringBuilder(" FROM " + keywordChecker.checkAndReplace(getFromTableInSQL()));
if (StringUtils.isNotBlank(whereCondition)) {
suffix.append(" WHERE " + whereCondition);
}
// 同样的加锁,同样的原因
suffix.append(" FOR UPDATE");
// 不同点: 保存所有列
StringJoiner selectSQLAppender = new StringJoiner(", ", "SELECT ", suffix.toString());
for (String column : tableMeta.getAllColumns().keySet()) {
selectSQLAppender.add(getColumnNameInSQL(keywordChecker.checkAndReplace(column)));
}
return selectSQLAppender.toString();
}
// after image 为空
@Override
protected TableRecords afterImage(TableRecords beforeImage) throws SQLException {
return TableRecords.empty(getTableMeta());
}

在 DeleteExecutor 中 before image 的构建基本和 UpdateExecutor 一样, 唯一的不同就是它会保存所有列, 因为删除过程会删掉所有列, 雾^.^。而 after image 直接返回空。

好了,我们拿到了业务 sql 执行前的快照,业务 sql 也执行完了, 然后拿到了业务 sql 执行后的快照, 接下来的工作就是准备回滚日志了, 万一全局事务回滚了, 我们还指望着它来进行本地事务的回滚呢。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// BaseTransactionalExecutor
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
// 快照都为空, 说明 sql 实际上啥都没干, 不存在回滚日志
if (beforeImage.getRows().size() == 0 && afterImage.getRows().size() == 0) {
return;
}
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
// 找到所有数据涉及的主键, 如果是删除过程则从 beforeImage 拿, 否则从 afterImage 拿
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
connectionProxy.appendLockKey(lockKeys);
// 把 SQL 类型,表名,beforeImage, afterImage, 存在一个 POJO 中
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
connectionProxy.appendUndoLog(sqlUndoLog);
// 将所有数据存在 Context 里
}
// 构建 SQLUndoLog POJO
protected SQLUndoLog buildUndoItem(TableRecords beforeImage, TableRecords afterImage) {
SQLType sqlType = sqlRecognizer.getSQLType();
String tableName = sqlRecognizer.getTableName();
SQLUndoLog sqlUndoLog = new SQLUndoLog();
sqlUndoLog.setSqlType(sqlType);
sqlUndoLog.setTableName(tableName);
sqlUndoLog.setBeforeImage(beforeImage);
sqlUndoLog.setAfterImage(afterImage);
return sqlUndoLog;
}

看来 Executor 这一层只把用到的主键和回滚日志准备好, 并没进行实际的工作, 那么这工作是谁干的呢? 实际上是调用 ConnectionProxy 的 commit 时, 才会试图去获取锁, 并写入回滚日志, 想想确实应该是这样, 执行器就管执行业务 sql, 然后提取业务 sql 的快照信息, 谁负责提交, 就让它把提交前该做的都做了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// ConnectionProxy
@Override
public void commit() throws SQLException {
try {
// 我们看到它其实有一个重试逻辑, 因为 TC 发生锁冲突会以 fastfail 的策略通知 RM, 所以 RM 这里实现了一个重试机制
// 具体重试多少次, 重试间隔是多少, 都是在配置文件中定义
LOCK_RETRY_POLICY.execute(() -> {
doCommit();
return null;
});
} catch (SQLException e) {
throw e;
} catch (Exception e) {
throw new SQLException(e);
}
}
private void doCommit() throws SQLException {
if (context.inGlobalTransaction()) {
processGlobalTransactionCommit();
} else if (context.isGlobalLockRequire()) {
processLocalCommitWithGlobalLocks();
} else {
targetConnection.commit();
}
}
private void processGlobalTransactionCommit() throws SQLException {
try {
// RM 注册分支事务, 顺便拿着刚才得到的主键列表去加锁
register();
} catch (TransactionException e) {
recognizeLockKeyConflictException(e, context.buildLockKeys());
}

try {
if (context.hasUndoLog()) {
// 有回滚日志的话, 保存回滚日志
if (JdbcConstants.ORACLE.equalsIgnoreCase(this.getDbType())) {
UndoLogManagerOracle.flushUndoLogs(this);
} else {
UndoLogManager.flushUndoLogs(this);
}
}
// 全完事了, 提交一下
targetConnection.commit();
} catch (Throwable ex) {
LOGGER.error("process connectionProxy commit error: {}", ex.getMessage(), ex);
// 向 TC 报告一下自己提交失败了, 要是进行全局回滚的话, 不用麻烦您提醒我回滚了
report(false);
throw new SQLException(ex);
}
// 向 TC 报告一下自己操作完了, 要是进行全局回滚的话, 一定要记着提醒我回滚
report(true);
// 清空 ThreadLocal 内容
context.reset();
}

在真正提交时, 如果发现目前处于全局事务中, 它就操作起来了:

  1. 向 TC 注册分支事务, 顺便拿着刚才得到的主键列表去加锁
  2. 有回滚日志的话, 保存回滚日志
  3. 然后提交并向 TC 报告结果
  4. 清空 Context 中 ThreadLocal 的内容

至此 AT 模式的核心内容就介绍完了, 等等, 我们是不是漏了什么, 对了, 还有 Select for update 呢, SelectForUpdateExecutor 都干啥了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// SelectForUpdateExecutor
@Override
public T doExecute(Object... args) throws Throwable {
Connection conn = statementProxy.getConnection();
T rs = null;
Savepoint sp = null;
// 定义自己的重试机制, 如果重试次数超标了会在 LockRetryController#sleep 中抛出异常
LockRetryController lockRetryController = new LockRetryController();
boolean originalAutoCommit = conn.getAutoCommit();
ArrayList<List<Object>> paramAppenderList = new ArrayList<>();
// 构建查询 SQL 使用原始 sql 中使用的筛选条件, 只查出 pk 列, 同时它加了悲观锁
String selectPKSQL = buildSelectSQL(paramAppenderList);
try {
// 如果当前连接自动提交则关闭自动提交
if (originalAutoCommit) {
conn.setAutoCommit(false);
}
// 建立一个保存点, 这样之后的回滚都会回退到该保存点, 而不会将保存点之前的 sql 回滚掉
sp = conn.setSavepoint();
// 循环重试
while (true) {
try {
// 执行原始 sql
rs = statementCallback.execute(statementProxy.getTargetStatement(), args);

// Try to get global lock of those rows selected
TableRecords selectPKRows = buildTableRecords(getTableMeta(), selectPKSQL, paramAppenderList);
String lockKeys = buildLockKey(selectPKRows);
if (StringUtils.isNullOrEmpty(lockKeys)) {
break;
}
statementProxy.getConnectionProxy().checkLock(lockKeys);
break;
} catch (LockConflictException lce) {
// TC 锁冲突, 回滚到保存点, 并试着重试
conn.rollback(sp);
lockRetryController.sleep(lce);
}
}
} finally {
// 释放保存点
if (sp != null) {
conn.releaseSavepoint(sp);
}
// 如果之前修改了自动提交的配置, 则需要改回去
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
}
return rs;
}

因为没有用到增删改的基类,所以很多事都得 SelectForUpdateExecutor 自己干:

  1. 自己构建重试逻辑
  2. 构建主键查询 sql
  3. 修改自动保存策略
  4. 构建保存点, 防止回滚过头
  5. 执行原始 sql
  6. 获取相关 pk
  7. 确认 pk 是否被别人占用
  8. 如果 pk 被别人占用, 则回滚并重试
  9. 否则释放保存点, 并恢复自动提交配置

这里原来的实现有点 bug, 在 @GlobalLock 模式下, 并不是在业务 sql 执行后立刻进行确认全局锁的状态, 而是在提交阶段才进行确认, 这不仅让 SelectForUpdateExecutor 的重试过程形同虚设, 而且会破坏 @GlobalLock + select for update 保证的 READ_COMMITTED 隔离级别, 大家可以看一下如下例子:

1
2
3
4
5
6
7
8
@Override
@GlobalLock
@Transactional(rollbackFor = {Throwable.class})
public void testGlobalLock() {
// select for update
baseMapper.testGlobalLock("1");
System.out.println("Hi, i got lock, i will do some thing with holding this lock.");
}

在这个事务中, 我先用 select for update 获取了 DB 资源锁, 但是因为事务没有提交, 所以 Seata 并没有确认当前是否有全局事务锁定了这些资源, 而当我执行完这些操作最后 commit 的时候, 会发现该资源其实是被锁定的, 这就说明我之前打印的 Hi, i got lock, i will do some thing with holding this lock. 其实并不该执行, 很显然这不是 READ_COMMITTED 隔离级别该有的表现, 为此, 我向官方提了这个 bug 并修正了它, 感兴趣的同学可以去看一下 issuepr, 那里更详细的叙述了该问题。

至此 1 阶段的过程就结束了, 我们将回滚日志和真正要执行的 SQL 都在本地 db 处理好了, 接下里就是等待二阶段的结果, 当 TM 进行全局事务的提交或者回滚后, 会通过 RPC 调用 RM, 这个 RPC 是 Seata 的内部RPC, 在 AT 模式下, 如果进行全局提交的话, 我们只需要异步删除回滚日志就行了, 这里 Seata 假设全局提交不会出现倒挂问题, 即进行全局提交时, 一阶段肯定已经完成, 并且将回滚日志写入 DB 了, 所以全局提交过程是直接删除回滚日志, 这里比较简单, 我们就不展示代码了, 我们着重看一下全局回滚的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
public static void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
assertDbSupport(dataSourceProxy.getDbType());
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
for (; ; ) {
try {
conn = dataSourceProxy.getPlainConnection();
// The entire undo process should run in a local transaction.
conn.setAutoCommit(false);
// Find UNDO LOG
selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL);
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
// 根据 TC 提供的事务 ID 找到回滚日志
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
exists = true;
// 确认一下回滚日志的状态, 是不是已经处理过了, 如果已经处理就直接返回, 这能保证幂等
// It is possible that the server repeatedly sends a rollback request to roll back
// the same branch transaction to multiple processes,
// ensuring that only the undo_log in the normal state is processed.
int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS);
if (!canUndo(state)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, ignore {} undo_log",
xid, branchId, state);
}
return;
}
String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
Blob b = rs.getBlob(ClientTableColumnsName.UNDO_LOG_ROLLBACK_INFO);
byte[] rollbackInfo = BlobUtils.blob2Bytes(b);
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() :
UndoLogParserFactory.getInstance(serializer);
// 解码回滚信息
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
SERIALIZER_LOCAL.set(parser.getName());
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
// 挨个执行回滚 sql, 各个 sql 类型, 回滚方式也不同, 后面介绍
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(),
sqlUndoLog);
undoExecutor.executeOn(conn);
}
} finally {
// remove serializer name
SERIALIZER_LOCAL.remove();
}
}
// If undo_log exists, it means that the branch transaction has completed the first phase,
// we can directly roll back and clean the undo_log
// Otherwise, it indicates that there is an exception in the branch transaction,
// causing undo_log not to be written to the database.
// For example, the business processing timeout, the global transaction is the initiator rolls back.
// To ensure data consistency, we can insert an undo_log with GlobalFinished state
// to prevent the local transaction of the first phase of other programs from being correctly submitted.
// See https://github.com/seata/seata/issues/489
if (exists) {
// 存在回滚日志, 该节点 1 阶段已经完成, 所以直接删除回滚日志, 假设不会存在倒挂问题
deleteUndoLog(xid, branchId, conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log deleted with {}",
xid, branchId, State.GlobalFinished.name());
}
} else {
// 如果回滚日志为空, 则插入一条全局事务已完成的回滚日志, 防止倒挂
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log added with {}",
xid, branchId, State.GlobalFinished.name());
}
}
return;
} catch (SQLIntegrityConstraintViolationException e) {
// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback",
xid, branchId);
}
} catch (Throwable e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
}
}
throw new TransactionException(BranchRollbackFailed_Retriable, String.format("%s/%s %s", branchId, xid, e.getMessage()),
e);
} finally {
try {
if (rs != null) {
rs.close();
}
if (selectPST != null) {
selectPST.close();
}
if (conn != null) {
conn.close();
}
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
}
}
}
}

回滚的过程有点长, 也不是很复杂, 总结一下就是:

  1. 根据 TC 提供的事务 ID 找到回滚日志, 如果回滚日志不存在, 直接执行步骤 5
  2. 确认一下回滚日志的状态, 是不是已经处理过了, 如果已经处理就直接返回, 这能保证幂等
  3. 解码回滚信息
  4. 挨个执行回滚 sql, 各个 sql 类型, 回滚方式也不同, 后面介绍
  5. 如果回滚日志为空, 则插入一条全局事务已完成的回滚日志, 防止倒挂, 否则说明该节点 1 阶段已经完成, 所以直接删除回滚日志
  6. 在一个事务中提交上述所有步骤的 SQL

代码很好的处理了, 多次回滚和倒挂的问题, 接下来我们看看对于不同的 SQL(insert, update, delete)它们的回滚 sql 是怎么生成的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// 首先无论是什么类型的 sql, 先判断 before image 和 after image 是否相同, 如果相同则不用回滚
// 然后都会先根据 pk 查询当前的数据, 如果当前数据和 after image 一致, 则判断它和 before image 是否相同, 如果相同则不用回滚
// 否则, 说明发生脏数据, 发出警报
if (IS_UNDO_DATA_VALIDATION_ENABLE && !dataValidationAndGoOn(conn)) {
return;
}
protected boolean dataValidationAndGoOn(Connection conn) throws SQLException {

TableRecords beforeRecords = sqlUndoLog.getBeforeImage();
TableRecords afterRecords = sqlUndoLog.getAfterImage();

// Compare current data with before data
// No need undo if the before data snapshot is equivalent to the after data snapshot.
Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords);
if (beforeEqualsAfterResult.getResult()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Stop rollback because there is no data change " +
"between the before data snapshot and the after data snapshot.");
}
// no need continue undo.
return false;
}

// Validate if data is dirty.
TableRecords currentRecords = queryCurrentRecords(conn);
// compare with current data and after image.
Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords);
if (!afterEqualsCurrentResult.getResult()) {

// If current data is not equivalent to the after data, then compare the current data with the before
// data, too. No need continue to undo if current data is equivalent to the before data snapshot
Result<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords);
if (beforeEqualsCurrentResult.getResult()) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Stop rollback because there is no data change " +
"between the before data snapshot and the current data snapshot.");
}
// no need continue undo.
return false;
} else {
if (LOGGER.isInfoEnabled()) {
if (StringUtils.isNotBlank(afterEqualsCurrentResult.getErrMsg())) {
LOGGER.info(afterEqualsCurrentResult.getErrMsg(), afterEqualsCurrentResult.getErrMsgParams());
}
}
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("check dirty datas failed, old and new data are not equal," +
"tableName:[" + sqlUndoLog.getTableName() + "]," +
"oldRows:[" + JSON.toJSONString(afterRecords.getRows()) + "]," +
"newRows:[" + JSON.toJSONString(currentRecords.getRows()) + "].");
}
throw new SQLException("Has dirty records when undo.");
}
}
return true;
}
// Insert 处理过程: 删除所有新增的列, 从 afterImageRows 中提取 pk
@Override
protected String buildUndoSQL() {
KeywordChecker keywordChecker = KeywordCheckerFactory.getKeywordChecker(JdbcConstants.MYSQL);
TableRecords afterImage = sqlUndoLog.getAfterImage();
List<Row> afterImageRows = afterImage.getRows();
if (afterImageRows == null || afterImageRows.size() == 0) {
throw new ShouldNeverHappenException("Invalid UNDO LOG");
}
Row row = afterImageRows.get(0);
Field pkField = row.primaryKeys().get(0);
// "DELETE FROM %s WHERE %s = ?"
return String.format(DELETE_SQL_TEMPLATE,
keywordChecker.checkAndReplace(sqlUndoLog.getTableName()),
keywordChecker.checkAndReplace(pkField.getName()));
}
// Delete 处理过程: 重新插入删掉的行, 从 beforeImage 中提取数据
@Override
protected String buildUndoSQL() {
KeywordChecker keywordChecker = KeywordCheckerFactory.getKeywordChecker(JdbcConstants.MYSQL);
TableRecords beforeImage = sqlUndoLog.getBeforeImage();
List<Row> beforeImageRows = beforeImage.getRows();
if (beforeImageRows == null || beforeImageRows.size() == 0) {
throw new ShouldNeverHappenException("Invalid UNDO LOG");
}
Row row = beforeImageRows.get(0);
List<Field> fields = new ArrayList<>(row.nonPrimaryKeys());
Field pkField = row.primaryKeys().get(0);
// PK is at last one.
fields.add(pkField);

String insertColumns = fields.stream()
.map(field -> keywordChecker.checkAndReplace(field.getName()))
.collect(Collectors.joining(", "));
String insertValues = fields.stream().map(field -> "?")
.collect(Collectors.joining(", "));
// "INSERT INTO %s (%s) VALUES (%s)"
return String.format(INSERT_SQL_TEMPLATE, keywordChecker.checkAndReplace(sqlUndoLog.getTableName()),
insertColumns, insertValues);
}
// Update 处理过程: 根据 before image 构建 update 的 sql, 进行数据更新
@Override
protected String buildUndoSQL() {
KeywordChecker keywordChecker = KeywordCheckerFactory.getKeywordChecker(JdbcConstants.MYSQL);
TableRecords beforeImage = sqlUndoLog.getBeforeImage();
List<Row> beforeImageRows = beforeImage.getRows();
if (beforeImageRows == null || beforeImageRows.size() == 0) {
throw new ShouldNeverHappenException("Invalid UNDO LOG"); // TODO
}
Row row = beforeImageRows.get(0);
Field pkField = row.primaryKeys().get(0);
List<Field> nonPkFields = row.nonPrimaryKeys();
String updateColumns = nonPkFields.stream()
.map(field -> keywordChecker.checkAndReplace(field.getName()) + " = ?")
.collect(Collectors.joining(", "));
"UPDATE %s SET %s WHERE %s = ?";
return String.format(UPDATE_SQL_TEMPLATE, keywordChecker.checkAndReplace(sqlUndoLog.getTableName()),
updateColumns, keywordChecker.checkAndReplace(pkField.getName()));
}

至此, 整个 AT 模式的奥义就全都介绍完了, 在使用 AT 模式时, 希望各位同学注意如下几点:

  1. 凡是在全局事务中会修改的表, 如果有本地事务也要修改该表, 一定要加 @GlobalLock 注解, 如果不加的话, 可能会造成全局事务过程中的脏写, 最终导致无法回滚, 所以一定要谨慎
  2. 如果明确想要读已提交隔离级别的话, 要配合使用 @GlobalLock 和 select for update, 单独使用任何一个都是无法达到预期的哦^.^

好了, AT 模式就讲这么多, 接下来我们再看一看性能更好但是用起来更加复杂的 TCC 模式。

参考内容

[1] fescar锁设计和隔离级别的理解
[2] 分布式事务中间件 Fescar - RM 模块源码解读
[3] Fescar分布式事务实现原理解析探秘
[4] Seata TCC 分布式事务源码分析
[5] 深度剖析一站式分布式事务方案 Seata-Server
[6] 分布式事务 Seata Saga 模式首秀以及三种模式详解
[7] 蚂蚁金服大规模分布式事务实践和开源详解
[8] 分布式事务 Seata TCC 模式深度解析
[9] Fescar (Seata)0.4.0 中文文档教程
[10] Seata Github Wiki
[11] 深度剖析一站式分布式事务方案Seata(Fescar)-Server

贝克街的流浪猫 wechat
您的打赏将鼓励我继续分享!
  • 本文作者: 贝克街的流浪猫
  • 本文链接: https://www.beikejiedeliulangmao.top/middleware/seata/bt-at/
  • 版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!
  • 创作声明: 本文基于上述所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。