(1). 概述

这一节,主要分析:TC通知所有的参与者(RM),进行commit.

(2). RmBranchCommitProcessor

RmNettyRemotingClient在初始化时(registerProcessor),是有指定code与RemotingProcessor的映射关系的.在TC有事件时,会触发回调.
TYPE_BRANCH_COMMIT(3) ==> RmBranchCommitProcessor

(3). 要先聊一下:DefaultRMHandler

DefaultRMHandler是什么?
答:DefaultRMHandler是TransactionMessageHandler的实现类,它主要负责接受TC发送过来的消息,并转交给业务处理.
DefaultRMHandler在什么时候初始化的?
答:DefaultRMHandler是在RMClient.init方法时初始化的(DefaultRMHandler.get())
DefaultRMHandler初始化时做了什么?
答:DefaultRMHandler在初始化时,会通过SPI加载:AbstractRMHandler的所有实现,并缓存在MAP容器里.

public class DefaultRMHandler extends AbstractRMHandler {
	// *****************************************************
	// 所有AbstractRMHandler的实现容器集合
	// {
	//	TCC=io.seata.rm.tcc.RMHandlerTCC@78eaffd7, 
	//  AT=io.seata.rm.RMHandlerAT@762d0eba, 
	//  SAGA=io.seata.saga.rm.RMHandlerSaga@721294a7, 
	//  XA=io.seata.rm.RMHandlerXA@37100481
	// }
	// *****************************************************
	protected static Map<BranchType, AbstractRMHandler> allRMHandlersMap = new ConcurrentHashMap<>();
	
	// 3.1 DefaultRMHandler.get()
	public static AbstractRMHandler get() {
		return DefaultRMHandler.SingletonHolder.INSTANCE;
	} // end
	
	// 3.2 懒汉模式
	private static class SingletonHolder {
		private static AbstractRMHandler INSTANCE = new DefaultRMHandler();
	} //end
	
	// 3.3 构造器
	protected DefaultRMHandler() {
		initRMHandlers();
	} //end

	// 3.4 initRMHandlers
	protected void initRMHandlers() {
		// 3.5 通过SPI加载:AbstractRMHandler的所有实现类,并注册到:DefaultRMHandler.allRMHandlersMap容器里.
		List<AbstractRMHandler> allRMHandlers = EnhancedServiceLoader.loadAll(AbstractRMHandler.class);
		if (CollectionUtils.isNotEmpty(allRMHandlers)) {
			for (AbstractRMHandler rmHandler : allRMHandlers) {
				
				allRMHandlersMap.put(rmHandler.getBranchType(), rmHandler);
			}
		}
	}// end 
}

(4). RmBranchCommitProcessor.process

public class RmBranchCommitProcessor implements RemotingProcessor {

    private static final Logger LOGGER = LoggerFactory.getLogger(RmBranchCommitProcessor.class);
	
	//  rmNettyRemotingClient.setTransactionMessageHandler(DefaultRMHandler.get());
	// 在RMClient初始化时通过静态工厂获取到的
	// io.seata.rm.DefaultRMHandler
    private TransactionMessageHandler handler;

	// RmNettyRemotingClient
    private RemotingClient remotingClient;

	
	// 3.1 在RmNettyRemotingClient.registerProcessor处构建的:RmBranchCommitProcessor
    public RmBranchCommitProcessor(TransactionMessageHandler handler, RemotingClient remotingClient) {
        this.handler = handler;
        this.remotingClient = remotingClient;
    }

	// 3.2 等待TC通知commit
    @Override
    public void process(ChannelHandlerContext ctx, RpcMessage rpcMessage) throws Exception {
		// 获得TC的远程地址
        String remoteAddress = NetUtil.toStringAddress(ctx.channel().remoteAddress());
		// 获得消息体
        Object msg = rpcMessage.getBody();
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("rm client handle branch commit process:" + msg);
        }
		
		// 3.3 委托给本地:handleBranchCommit
        handleBranchCommit(rpcMessage, remoteAddress, (BranchCommitRequest) msg);
    }

	// // 3.3 handleBranchCommit
    private void handleBranchCommit(RpcMessage request, String serverAddress, BranchCommitRequest branchCommitRequest) {
        // 定义返回给:TC的报文信息
		BranchCommitResponse resultMessage;
		
		// **********************************************************
		// 4. 把事件委派给:DefaultRMHandler.onRequest()方法
		// **********************************************************
        resultMessage = (BranchCommitResponse) handler.onRequest(branchCommitRequest, null);
        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("branch commit result:" + resultMessage);
        }
        try {
			// **********************************************************
			// 向TC发送消息,告之commit状态.
			// **********************************************************
            this.remotingClient.sendAsyncResponse(serverAddress, request, resultMessage);
        } catch (Throwable throwable) {
            LOGGER.error("branch commit error: {}", throwable.getMessage(), throwable);
        }
    }
}

(5). DefaultRMHandler.onRequest

DefaultRMHandler属于:AbstractRMHandler的子类.

// AbstractRMHandler.onRequest
public AbstractResultMessage onRequest(AbstractMessage request, RpcContext context) {
	// 对消息进行判断,只处理:AbstractTransactionRequestToRM的消息
	if (!(request instanceof AbstractTransactionRequestToRM)) {
		throw new IllegalArgumentException();
	}
	// 把request强制转换成:AbstractTransactionRequestToRM
	AbstractTransactionRequestToRM transactionRequest = (AbstractTransactionRequestToRM)request;
	
	// ******************************************************************
	// 5. 设置:setRMInboundMessageHandler为:DefaultRMHandler对象
	//    transactionRequest.setRMInboundMessageHandler(DefaultRMHandler.this);
	// ******************************************************************
	transactionRequest.setRMInboundMessageHandler(this);
	
	// ******************************************************************
	// transactionRequest.handle是肯定会回调:DefaultRMHandler.handle方法的
	// ******************************************************************
	return transactionRequest.handle(context);
} // end onRequest

// rm接受到tc事件类型为(BranchCommitRequest)
public class BranchCommitRequest extends AbstractBranchEndRequest {

    @Override
    public short getTypeCode() {
        return MessageType.TYPE_BRANCH_COMMIT;
    }

    @Override
    public AbstractTransactionResponse handle(RpcContext rpcContext) {
		// handler == DefaultRMHandler
        return handler.handle(this);
    }
}

(6). DefaultRMHandler.handle

// domain
// allRMHandlersMap = {
//	TCC=io.seata.rm.tcc.RMHandlerTCC@78eaffd7, 
//  AT=io.seata.rm.RMHandlerAT@762d0eba, 
//  SAGA=io.seata.saga.rm.RMHandlerSaga@721294a7, 
//  XA=io.seata.rm.RMHandlerXA@37100481
// }
protected static Map<BranchType, AbstractRMHandler> allRMHandlersMap = new ConcurrentHashMap<>();

// methods
protected AbstractRMHandler getRMHandler(BranchType branchType) {
	return allRMHandlersMap.get(branchType);
}

// allRMHandlersMap在DefaultRMHandler初始化时,通过SPI加载过了
// 
public BranchCommitResponse handle(BranchCommitRequest request) {
	// getRMHandler(request.getBranchType())
	return getRMHandler(request.getBranchType()).handle(request);
}

(7). RMHandlerAT.handle

RMHandlerAT属于:AbstractRMHandler的子类.

// AbstractRMHandler.handle(BranchCommitRequest request)
public BranchCommitResponse handle(BranchCommitRequest request) {
	// 7.1 创建分支提交Response
	BranchCommitResponse response = new BranchCommitResponse();
	// 7.2 创建:匿名AbstractCallback
	// 7.3 调用:exceptionHandleTemplate(AbstractCallback)
	exceptionHandleTemplate(new AbstractCallback<BranchCommitRequest, BranchCommitResponse>() {
		@Override
		public void execute(BranchCommitRequest request, BranchCommitResponse response)
			throws TransactionException {
			 // ****************************************************************
			 // 8. doBranchCommit
			 // ****************************************************************
			doBranchCommit(request, response);
		}
	}, request, response);
	return response;
} // end handle


public <T extends AbstractTransactionRequest, S extends AbstractTransactionResponse> void exceptionHandleTemplate(Callback<T, S> callback, T request, S response) {
	try {
		// 7.4 先调用:execute
		callback.execute(request, response);
		// 7.6 然后调用:onSuccess
		callback.onSuccess(request, response);
	} catch (TransactionException tex) {
		LOGGER.error("Catch TransactionException while do RPC, request: {}", request, tex);
		callback.onTransactionException(request, response, tex);
	} catch (RuntimeException rex) {
		LOGGER.error("Catch RuntimeException while do RPC, request: {}", request, rex);
		callback.onException(request, response, rex);
	}
} //end exceptionHandleTemplate

(8). AbstractRMHandler.doBranchCommit

protected void doBranchCommit(BranchCommitRequest request, BranchCommitResponse response)
        throws TransactionException {
	 // 获得TC发送过来的消息内容
	 // xid
	String xid = request.getXid();
	// branchdId
	long branchId = request.getBranchId();
	// resourceId
	String resourceId = request.getResourceId();
	// 应用程序数据
	String applicationData = request.getApplicationData();
	if (LOGGER.isInfoEnabled()) {
		LOGGER.info("Branch committing: " + xid + " " + branchId + " " + resourceId + " " + applicationData);
	}
	
	// ***************************************************************************
	// getResourceManager() == DefaultResourceManager
	// 委托给:DefaultResourceManager处理分支提交(DataSourceManager.branchCommit)
	// ***************************************************************************
	BranchStatus status = getResourceManager().branchCommit(request.getBranchType(), xid, branchId, resourceId,
		applicationData);
	
	// 构建分支提交返回信息.
	response.setXid(xid);
	response.setBranchId(branchId);
	response.setBranchStatus(status);
	
	if (LOGGER.isInfoEnabled()) {
		LOGGER.info("Branch commit result: " + status);
	}
}

(9). DataSourceManager.branchCommit

public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
	
	// *****************************************************************
	// 10. 委托给:AsyncWorker类处理
	// *****************************************************************
	// Seata的设计是:commit/rollback是异步处理的,AsyncWorker把数据放到队列里.
	return asyncWorker.branchCommit(branchType, xid, branchId, resourceId, applicationData);
}

(10). AsyncWorker.branchCommit

AsyncWorker.branchCommit方法只是把数据放入到了队列,那么,在什么时候执行呢?

// domain 
// 队列
private static final BlockingQueue<Phase2Context> ASYNC_COMMIT_BUFFER = new LinkedBlockingQueue<>(
        ASYNC_COMMIT_BUFFER_LIMIT);

// methods
public BranchStatus branchCommit(BranchType branchType, String xid, long branchId, String resourceId,
                                     String applicationData) throws TransactionException {
	// 10.1 把参数包装成:Phase2Context
	// 10.2 offer入队列.
	if (!ASYNC_COMMIT_BUFFER.offer(new Phase2Context(branchType, xid, branchId, resourceId, applicationData))) {
		LOGGER.warn("Async commit buffer is FULL. Rejected branch [{}/{}] will be handled by housekeeping later.", branchId, xid);
	}
	return BranchStatus.PhaseTwo_Committed;
}

(11). AsyncWorker何时执行队列里的数据(commit)呢?

public synchronized void init() {
	LOGGER.info("Async Commit Buffer Limit: {}", ASYNC_COMMIT_BUFFER_LIMIT);
	ScheduledExecutorService timerExecutor = new ScheduledThreadPoolExecutor(1, new NamedThreadFactory("AsyncWorker", 1, true));
	timerExecutor.scheduleAtFixedRate(() -> {
		try {
			// 11.1 调用本地:doBranchCommits
			doBranchCommits();
		} catch (Throwable e) {
			LOGGER.info("Failed at async committing ... {}", e.getMessage());
		}
	}, 10, 1000 * 1, TimeUnit.MILLISECONDS);  // 每一秒执行一次
}// end init

private void doBranchCommits() {
	if (ASYNC_COMMIT_BUFFER.isEmpty()) {
		return;
	}
	
	Map<String, List<Phase2Context>> mappedContexts = new HashMap<>(DEFAULT_RESOURCE_SIZE);
	List<Phase2Context> contextsGroupedByResourceId;
	
	// 11.2 批量获取队列里的数据
	while (!ASYNC_COMMIT_BUFFER.isEmpty()) {
		Phase2Context commitContext = ASYNC_COMMIT_BUFFER.poll();
		contextsGroupedByResourceId = CollectionUtils.computeIfAbsent(mappedContexts, commitContext.resourceId, key -> new ArrayList<>());
		contextsGroupedByResourceId.add(commitContext);
	}


	for (Map.Entry<String, List<Phase2Context>> entry : mappedContexts.entrySet()) {
		Connection conn = null;
		DataSourceProxy dataSourceProxy;
		try {
			
			try {
				// 11.3 确保Connection是存在的
				DataSourceManager resourceManager = (DataSourceManager) DefaultResourceManager.get()
					.getResourceManager(BranchType.AT);
				dataSourceProxy = resourceManager.get(entry.getKey());
				if (dataSourceProxy == null) {
					throw new ShouldNeverHappenException("Failed to find resource on " + entry.getKey());
				}
				conn = dataSourceProxy.getPlainConnection();
			} catch (SQLException sqle) {
				LOGGER.warn("Failed to get connection for async committing on " + entry.getKey(), sqle);
				continue;
			}
			contextsGroupedByResourceId = entry.getValue();
			Set<String> xids = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
			Set<Long> branchIds = new LinkedHashSet<>(UNDOLOG_DELETE_LIMIT_SIZE);
			
			
			for (Phase2Context commitContext : contextsGroupedByResourceId) {
				xids.add(commitContext.xid);
				branchIds.add(commitContext.branchId);
				int maxSize = Math.max(xids.size(), branchIds.size());
				if (maxSize == UNDOLOG_DELETE_LIMIT_SIZE) {
					try {
						// *********************************************************
						// 12. 批量:清除undo_log表的日志信息(xid,branchId)
						// *********************************************************
						UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(
							xids, branchIds, conn);
					} catch (Exception ex) {
						LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
					}
					xids.clear();
					branchIds.clear();
				}
			}

			if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
				return;
			}

			try {
				UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).batchDeleteUndoLog(xids,
					branchIds, conn);
			} catch (Exception ex) {
				LOGGER.warn("Failed to batch delete undo log [" + branchIds + "/" + xids + "]", ex);
			}

			if (!conn.getAutoCommit()) {
				conn.commit();
			}
		} catch (Throwable e) {
			LOGGER.error(e.getMessage(), e);
			try {
				if (conn != null) {
					conn.rollback();
				}
			} catch (SQLException rollbackEx) {
				LOGGER.warn("Failed to rollback JDBC resource while deleting undo_log ", rollbackEx);
			}
		} finally {
			if (conn != null) {
				try {
					conn.close();
				} catch (SQLException closeEx) {
					LOGGER.warn("Failed to close JDBC resource while deleting undo_log ", closeEx);
				}
			}
		}
	}
} // end doBranchCommits

(12). UndoLogManager.batchDeleteUndoLog

AbstractUndoLogManager.batchDeleteUndoLog

public void batchDeleteUndoLog(Set<String> xids, Set<Long> branchIds, Connection conn) throws SQLException {
	if (CollectionUtils.isEmpty(xids) || CollectionUtils.isEmpty(branchIds)) {
		return;
	}
	
	int xidSize = xids.size();
	int branchIdSize = branchIds.size();
	// 12.1 构建删除的SQL
	// DELETE FROM undo_log WHERE branch_id =   IN (?)  AND xid IN (?)
	String batchDeleteSql = toBatchDeleteUndoLogSql(xidSize, branchIdSize);
	
	try (PreparedStatement deletePST = conn.prepareStatement(batchDeleteSql)) {
		int paramsIndex = 1;
		for (Long branchId : branchIds) {
			deletePST.setLong(paramsIndex++, branchId);
		}
		for (String xid : xids) {
			deletePST.setString(paramsIndex++, xid);
		}
		
		int deleteRows = deletePST.executeUpdate();
		if (LOGGER.isDebugEnabled()) {
			LOGGER.debug("batch delete undo log size {}", deleteRows);
		}
	} catch (Exception e) {
		if (!(e instanceof SQLException)) {
			e = new SQLException(e);
		}
		throw (SQLException) e;
	}
} // end batchDeleteUndoLog

// 12.2 构建删除的SQL
protected static String toBatchDeleteUndoLogSql(int xidSize, int branchIdSize) {
	StringBuilder sqlBuilder = new StringBuilder(64);
	sqlBuilder.append("DELETE FROM ").append(UNDO_LOG_TABLE_NAME).append(" WHERE  ").append(
		ClientTableColumnsName.UNDO_LOG_BRANCH_XID).append(" IN ");
	appendInParam(branchIdSize, sqlBuilder);
	sqlBuilder.append(" AND ").append(ClientTableColumnsName.UNDO_LOG_XID).append(" IN ");
	appendInParam(xidSize, sqlBuilder);
	return sqlBuilder.toString();
} // end toBatchDeleteUndoLogSql

(12). RmBranchCommitProcessor执行流程图解

"RmBranchCommitProcessor执行流程图解"

(13). 总结

AT模式下的第二阶段(commit)的分析的内容还是比较多的.
通过源码分析,AT模式下commit只要入队列(offer)成功了,就直接告之TC成功了.