Fail to get auto-generated keys, use SELECT LAST_INSERT_ID() instead. Be cautious, statement could be polluted. Recommend you set the statement to return generated keys.
@Override @GlobalLock @Transactional(rollbackFor = {Throwable.class}) publicvoidtestGlobalLock(){ // select for update baseMapper.testGlobalLock("1"); System.out.println("Hi, i got lock, i will do some thing with holding this lock."); }
在这个事务中, 我先用 select for update 获取了 DB 资源锁, 但是因为事务没有提交, 所以 Seata 并没有确认当前是否有全局事务锁定了这些资源, 而当我执行完这些操作最后 commit 的时候, 会发现该资源其实是被锁定的, 这就说明我之前打印的 Hi, i got lock, i will do some thing with holding this lock. 其实并不该执行, 很显然这不是 READ_COMMITTED 隔离级别该有的表现, 为此, 我向官方提了这个 bug 并修正了它, 感兴趣的同学可以去看一下 issue 和 pr, 那里更详细的叙述了该问题。
publicstaticvoidundo(DataSourceProxy dataSourceProxy, String xid, long branchId)throws TransactionException { assertDbSupport(dataSourceProxy.getDbType()); Connection conn = null; ResultSet rs = null; PreparedStatement selectPST = null; for (; ; ) { try { conn = dataSourceProxy.getPlainConnection(); // The entire undo process should run in a local transaction. conn.setAutoCommit(false); // Find UNDO LOG selectPST = conn.prepareStatement(SELECT_UNDO_LOG_SQL); selectPST.setLong(1, branchId); selectPST.setString(2, xid); // 根据 TC 提供的事务 ID 找到回滚日志 rs = selectPST.executeQuery(); boolean exists = false; while (rs.next()) { exists = true; // 确认一下回滚日志的状态, 是不是已经处理过了, 如果已经处理就直接返回, 这能保证幂等 // It is possible that the server repeatedly sends a rollback request to roll back // the same branch transaction to multiple processes, // ensuring that only the undo_log in the normal state is processed. int state = rs.getInt(ClientTableColumnsName.UNDO_LOG_LOG_STATUS); if (!canUndo(state)) { if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state); } return; } String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT); Map<String, String> context = parseContext(contextString); Blob b = rs.getBlob(ClientTableColumnsName.UNDO_LOG_ROLLBACK_INFO); byte[] rollbackInfo = BlobUtils.blob2Bytes(b); String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY); UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance() : UndoLogParserFactory.getInstance(serializer); // 解码回滚信息 BranchUndoLog branchUndoLog = parser.decode(rollbackInfo); try { // put serializer name to local SERIALIZER_LOCAL.set(parser.getName()); List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs(); if (sqlUndoLogs.size() > 1) { Collections.reverse(sqlUndoLogs); } // 挨个执行回滚 sql, 各个 sql 类型, 回滚方式也不同, 后面介绍 for (SQLUndoLog sqlUndoLog : sqlUndoLogs) { TableMeta tableMeta = TableMetaCache.getTableMeta(dataSourceProxy, sqlUndoLog.getTableName()); sqlUndoLog.setTableMeta(tableMeta); AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor( dataSourceProxy.getDbType(), sqlUndoLog); undoExecutor.executeOn(conn); } } finally { // remove serializer name SERIALIZER_LOCAL.remove(); } } // If undo_log exists, it means that the branch transaction has completed the first phase, // we can directly roll back and clean the undo_log // Otherwise, it indicates that there is an exception in the branch transaction, // causing undo_log not to be written to the database. // For example, the business processing timeout, the global transaction is the initiator rolls back. // To ensure data consistency, we can insert an undo_log with GlobalFinished state // to prevent the local transaction of the first phase of other programs from being correctly submitted. // See https://github.com/seata/seata/issues/489 if (exists) { // 存在回滚日志, 该节点 1 阶段已经完成, 所以直接删除回滚日志, 假设不会存在倒挂问题 deleteUndoLog(xid, branchId, conn); conn.commit(); if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId, State.GlobalFinished.name()); } } else { // 如果回滚日志为空, 则插入一条全局事务已完成的回滚日志, 防止倒挂 insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn); conn.commit(); if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId, State.GlobalFinished.name()); } } return; } catch (SQLIntegrityConstraintViolationException e) { // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log if (LOGGER.isInfoEnabled()) { LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId); } } catch (Throwable e) { if (conn != null) { try { conn.rollback(); } catch (SQLException rollbackEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx); } } thrownew TransactionException(BranchRollbackFailed_Retriable, String.format("%s/%s %s", branchId, xid, e.getMessage()), e); } finally { try { if (rs != null) { rs.close(); } if (selectPST != null) { selectPST.close(); } if (conn != null) { conn.close(); } } catch (SQLException closeEx) { LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx); } } } }
// Compare current data with before data // No need undo if the before data snapshot is equivalent to the after data snapshot. Result<Boolean> beforeEqualsAfterResult = DataCompareUtils.isRecordsEquals(beforeRecords, afterRecords); if (beforeEqualsAfterResult.getResult()) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Stop rollback because there is no data change " + "between the before data snapshot and the after data snapshot."); } // no need continue undo. returnfalse; }
// Validate if data is dirty. TableRecords currentRecords = queryCurrentRecords(conn); // compare with current data and after image. Result<Boolean> afterEqualsCurrentResult = DataCompareUtils.isRecordsEquals(afterRecords, currentRecords); if (!afterEqualsCurrentResult.getResult()) {
// If current data is not equivalent to the after data, then compare the current data with the before // data, too. No need continue to undo if current data is equivalent to the before data snapshot Result<Boolean> beforeEqualsCurrentResult = DataCompareUtils.isRecordsEquals(beforeRecords, currentRecords); if (beforeEqualsCurrentResult.getResult()) { if (LOGGER.isInfoEnabled()) { LOGGER.info("Stop rollback because there is no data change " + "between the before data snapshot and the current data snapshot."); } // no need continue undo. returnfalse; } else { if (LOGGER.isInfoEnabled()) { if (StringUtils.isNotBlank(afterEqualsCurrentResult.getErrMsg())) { LOGGER.info(afterEqualsCurrentResult.getErrMsg(), afterEqualsCurrentResult.getErrMsgParams()); } } if (LOGGER.isDebugEnabled()) { LOGGER.debug("check dirty datas failed, old and new data are not equal," + "tableName:[" + sqlUndoLog.getTableName() + "]," + "oldRows:[" + JSON.toJSONString(afterRecords.getRows()) + "]," + "newRows:[" + JSON.toJSONString(currentRecords.getRows()) + "]."); } thrownew SQLException("Has dirty records when undo."); } } returntrue; } // Insert 处理过程: 删除所有新增的列, 从 afterImageRows 中提取 pk @Override protected String buildUndoSQL(){ KeywordChecker keywordChecker = KeywordCheckerFactory.getKeywordChecker(JdbcConstants.MYSQL); TableRecords afterImage = sqlUndoLog.getAfterImage(); List<Row> afterImageRows = afterImage.getRows(); if (afterImageRows == null || afterImageRows.size() == 0) { thrownew ShouldNeverHappenException("Invalid UNDO LOG"); } Row row = afterImageRows.get(0); Field pkField = row.primaryKeys().get(0); // "DELETE FROM %s WHERE %s = ?" return String.format(DELETE_SQL_TEMPLATE, keywordChecker.checkAndReplace(sqlUndoLog.getTableName()), keywordChecker.checkAndReplace(pkField.getName())); } // Delete 处理过程: 重新插入删掉的行, 从 beforeImage 中提取数据 @Override protected String buildUndoSQL(){ KeywordChecker keywordChecker = KeywordCheckerFactory.getKeywordChecker(JdbcConstants.MYSQL); TableRecords beforeImage = sqlUndoLog.getBeforeImage(); List<Row> beforeImageRows = beforeImage.getRows(); if (beforeImageRows == null || beforeImageRows.size() == 0) { thrownew ShouldNeverHappenException("Invalid UNDO LOG"); } Row row = beforeImageRows.get(0); List<Field> fields = new ArrayList<>(row.nonPrimaryKeys()); Field pkField = row.primaryKeys().get(0); // PK is at last one. fields.add(pkField);