(1). 前言

前面剖析了全局事务的启动过程,可是分支事务和全局事务在不同的进程里,分支事务与全局事务是如何关联的呢? 核心逻辑就在:omega-transport工程里(其实,你会发现跟ZipKin好像.)

# 整个传输层,是一工程层抽象,可以根据项目的要求:依赖不同的实现(dubbo/feig/resttemplate...).
# 我这里依赖的是:omega-transport-resttemplate
lixin-macbook:omega lixin$ tree omega-transport -L 1
omega-transport
├── omega-transport-dubbo
├── omega-transport-feign
├── omega-transport-hystrix
├── omega-transport-resttemplate
├── omega-transport-servicecomb
├── omega-transport.iml
├── pom.xml
└── target

(2). 如何入手?

查看omega-transport-resttemplate工程信息,我们知道,这是一个Spring Boot的工程,Spring Boot在启动时,会通过SPI加载:spring.factories里的内容,并实例化配置的类.

RestTemplateConfig用于传播全局事务ID到分支事务里. WebConfig用于从Http协议头,获取全局事务ID,绑定到当前线程里.

"omega-transport-resttemplate"

(3). RestTemplateConfig

创建:RestTemplate实例,并为它配置拦截器.

@Configuration
public class RestTemplateConfig {

  // 1. 创建:RestTemplate实例
  @Bean(name = "omegaRestTemplate")
  public RestTemplate omegaRestTemplate(
          // 注入:OmegaContext
          @Autowired(required = false) OmegaContext context,
		  RestTemplateBuilder restTemplateBuilder) {
    return restTemplateBuilder
	     // 添加拦截器:TransactionClientHttpRequestInterceptor
        .additionalInterceptors(new TransactionClientHttpRequestInterceptor(context))
        .build();
  }// end 
}

(4). TransactionClientHttpRequestInterceptor

拦截RestTemplate的所有请求,当线程上下文中存在全局事务ID(globalTxId),则,把全局事务ID(globalTxId)和本地事务ID(localTxId),添加到HTTP协议头里.

class TransactionClientHttpRequestInterceptor implements org.springframework.http.client.ClientHttpRequestInterceptor {
  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
  private final OmegaContext omegaContext;

  TransactionClientHttpRequestInterceptor(OmegaContext omegaContext) {
    this.omegaContext = omegaContext;
  }

  @Override
  public ClientHttpResponse intercept(
                            HttpRequest request, byte[] body,
							ClientHttpRequestExecution execution) throws IOException {
	
	//线程上下文(OmegaContext)里的全局事务ID,如果,存在全局事务ID的话
    // 为HttpRequest添加协议头(X-Pack-Global-Transaction-Id/X-Pack-Local-Transaction-Id)
    if (omegaContext!= null && omegaContext.globalTxId() != null) {
      request.getHeaders().add(GLOBAL_TX_ID_KEY, omegaContext.globalTxId());
      request.getHeaders().add(LOCAL_TX_ID_KEY, omegaContext.localTxId());
      
      LOG.debug("Added {} {} and {} {} to request header",
          GLOBAL_TX_ID_KEY,
          omegaContext.globalTxId(),
          LOCAL_TX_ID_KEY,
          omegaContext.localTxId());
    } else {
      LOG.debug("Cannot inject transaction ID, as the OmegaContext is null or cannot get the globalTxId.");
    }
	
    return execution.execute(request, body);
  }
  
}

(5). WebConfig

为Spring MVC项目,添加拦截器.

@Configuration
@EnableWebMvc
// 1. 给Spring的项目添加:Interceptor
//    为什么不直接添加Filter,如果集成的项目是Struts或者原生Servlet,是否有做考虑?  
public class WebConfig extends WebMvcConfigurerAdapter {

  private final OmegaContext omegaContext;

  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());


  @Autowired
  public WebConfig(@Autowired(required=false) OmegaContext omegaContext) {
    this.omegaContext = omegaContext;
  }

  @Override
  public void addInterceptors(InterceptorRegistry registry) {
    if (omegaContext == null) {
      LOG.info("The OmegaContext is not injected, The transaction handler is disabled");
    }
	// ************************************************************
	// 为Spring的项目,添加拦截器.
	// ************************************************************
    registry.addInterceptor(new TransactionHandlerInterceptor(omegaContext));
  }
}

(6). TransactionHandlerInterceptor

TransactionHandlerInterceptor实际是在分支事务侧来着的,它会拦截所有的MVC请求,然后,获得全局事务ID和分支事务ID绑定到当前线程里.

// HandlerInterceptor为Spring的拦截器
class TransactionHandlerInterceptor implements org.springframework.web.servlet.HandlerInterceptor {

  private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());

  private final OmegaContext omegaContext;

  TransactionHandlerInterceptor(OmegaContext omegaContext) {
    this.omegaContext = omegaContext;
  }

  //  请求Halder之前,走该方法
  @Override
  public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
	// 从Http的协议头(X-Pack-Global-Transaction-Id)里,获得全局事务ID,如果全局事务ID存在,则绑定到线程上下文里(OmegaContext)
    if (omegaContext != null) {
      String globalTxId = request.getHeader(GLOBAL_TX_ID_KEY);
      if (globalTxId == null) {
        LOG.debug("Cannot inject transaction ID, no such header: {}", GLOBAL_TX_ID_KEY);
      } else {
		// 绑定全局事务ID和分支事务ID到线程上下文里.
        omegaContext.setGlobalTxId(globalTxId);
        omegaContext.setLocalTxId(request.getHeader(LOCAL_TX_ID_KEY));
      }
    } else {
      LOG.debug("Cannot inject transaction ID, as the OmegaContext is null.");
    }
    return true;
  }

  @Override
  public void postHandle(HttpServletRequest request, HttpServletResponse response, Object o, ModelAndView mv) {
  }

  // 请求目标方法结束以后,会走该方法
  @Override
  public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object o, Exception e) {
    if (omegaContext != null) {
		// **********************************************
		// 方法结束后,清场
		// **********************************************
      omegaContext.clear();
    }
  }
}

(7). 总结

TransactionClientHttpRequestInterceptor负责把全局事务和本地事务,传播(添加HTTP协议头)到分支事务里. TransactionHandlerInterceptor负责从HTTP协议头里取出,上面传递的全局事务和本地事务,绑定到当前线程上下文里.