(1). 概述

在这一小篇,主要剖析:ServiceActivatingHandler.

(2). 先看下ServiceActivatingHandler类的继承关系

org.springframework.integration.context.IntegrationObjectSupport
	org.springframework.integration.handler.AbstractMessageHandler
		org.springframework.integration.handler.AbstractMessageProducingHandler
			org.springframework.integration.handler.AbstractReplyProducingMessageHandler
				org.springframework.integration.handler.ServiceActivatingHandler

(3). 看下ServiceActivatingHandler类结构图

"ServiceActivatingHandler"

(4). AbstractReplyProducingMessageHandler

为什么入口是:AbstractReplyProducingMessageHandler.onInit方法?因为:IntegrationObjectSupport方法,实现了:InitializingBean.Spring初始化时会回调:afterPropertiesSet方法.

// 从代码分析来看:afterPropertiesSet仅仅是做了一些初始化的工作.

public abstract class IntegrationObjectSupport implements BeanNameAware, NamedComponent,
		ApplicationContextAware, BeanFactoryAware, InitializingBean, ExpressionCapable {
    
	// *************************************************************************
	// 1. 由于实现了:InitializingBean,所以,Spring会回调该方法.
	// *************************************************************************
	public final void afterPropertiesSet() {
		this.integrationProperties = IntegrationContextUtils.getIntegrationProperties(this.beanFactory);
		try {
			if (this.messageBuilderFactory == null) {
				if (this.beanFactory != null) {
					this.messageBuilderFactory = IntegrationUtils.getMessageBuilderFactory(this.beanFactory);
				}
				else {
					this.messageBuilderFactory = new DefaultMessageBuilderFactory();
				}
			}
			// *************************************************************************
			// 2.委托给孙类:AbstractReplyProducingMessageHandler.onInit,因为:AbstractReplyProducingMessageHandler重写了该方法
			// *************************************************************************
			this.onInit();
		}
		catch (Exception e) {
			if (e instanceof RuntimeException) {
				throw (RuntimeException) e;
			}
			throw new BeanInitializationException("failed to initialize", e);
		}
	} // end afterPropertiesSet
	
}

public abstract class AbstractMessageHandler extends IntegrationObjectSupport implements MessageHandler,
		MessageHandlerMetrics, ConfigurableMetricsAware<AbstractMessageHandlerMetrics>, TrackableComponent, Orderable {
    
	// *************************************************************************
	// 6. 初始化
	// *************************************************************************
    protected void onInit() throws Exception {
		if (this.statsEnabled) {
			this.handlerMetrics.setFullStatsEnabled(true);
		}
	}
}

public abstract class AbstractMessageProducingHandler extends AbstractMessageHandler
		implements MessageProducer, HeaderPropagationAware {
    
	private final Set<String> notPropagatedHeaders = new HashSet<String>();
	protected final MessagingTemplate messagingTemplate = new MessagingTemplate();
	private volatile MessageChannel outputChannel;
	private volatile String outputChannelName;
	private volatile boolean async;
	private boolean selectiveHeaderPropagation;
	
	
	protected void onInit() throws Exception {
		// *************************************************************************
		// 5. 先调用父类AbstractMessageHandler.onInit()
		// *************************************************************************
		super.onInit();
		Assert.state(!(this.outputChannelName != null && this.outputChannel != null), //NOSONAR (inconsistent sync)
				"'outputChannelName' and 'outputChannel' are mutually exclusive.");
		if (getBeanFactory() != null) {
			// 配置BeanFactory
			this.messagingTemplate.setBeanFactory(getBeanFactory());
		}
		// 配置:BeanFactoryChannelResolver
		this.messagingTemplate.setDestinationResolver(getChannelResolver());
	}
} // end AbstractMessageProducingHandler


public abstract class AbstractReplyProducingMessageHandler 
		extends AbstractMessageProducingHandler
		implements BeanClassLoaderAware {
    
	// *************************************************************************
	// 3. IntegrationObjectSupport触发onInit
	// *************************************************************************
	protected final void onInit() throws Exception {
		// *************************************************************************
		// 4. 先调用父类AbstractMessageProducingHandler.onInit()
		// *************************************************************************
		
		super.onInit();
		
		if (!CollectionUtils.isEmpty(this.adviceChain)) {
			ProxyFactory proxyFactory = new ProxyFactory(new AdvisedRequestHandler());
			boolean advised = false;
			for (Advice advice : this.adviceChain) {
				if (!(advice instanceof HandleMessageAdvice)) {
					proxyFactory.addAdvice(advice);
					advised = true;
				}
			}
			if (advised) {
				this.advisedRequestHandler = (RequestHandler) proxyFactory.getProxy(this.beanClassLoader);
			}
		}
		
		// *************************************************************************
		// 7. 回调子类ServiceActivatingHandler.doInit
		// *************************************************************************
		doInit();
	} // end onInit
} // end AbstractReplyProducingMessageHandler


public class ServiceActivatingHandler extends AbstractReplyProducingMessageHandler implements Lifecycle {
	
	// 包裹业务对象
	private final MessageProcessor<?> processor;
	
	
	// *************************************************************************
	// 8. ServiceActivatingHandler处理初始化
	// *************************************************************************
	protected void doInit() {
		// 为MessageProcessor配置:ConversionService
		if (this.processor instanceof AbstractMessageProcessor) {
			((AbstractMessageProcessor<?>) this.processor).setConversionService(this.getConversionService());
		}
		
		// 为MessageProcessor配置:BeanFactory
		if (this.processor instanceof BeanFactoryAware && this.getBeanFactory() != null) {
			((BeanFactoryAware) this.processor).setBeanFactory(this.getBeanFactory());
		}
	} // end doInit
}

(5). AbstractMessageHandler.handleMessage

public abstract class AbstractMessageHandler extends IntegrationObjectSupport implements MessageHandler,
		MessageHandlerMetrics, ConfigurableMetricsAware<AbstractMessageHandlerMetrics>, TrackableComponent, Orderable {

	public void handleMessage(Message<?> message) {
		// ... ...
		MetricsContext start = null;
		boolean countsEnabled = this.countsEnabled;
		AbstractMessageHandlerMetrics handlerMetrics = this.handlerMetrics;
		try {
			if (message != null && this.shouldTrack) { // 是否为track,track对message又进行了处理.
				message = MessageHistory.write(message, this, this.getMessageBuilderFactory());
			}
			
			if (countsEnabled) {
				start = handlerMetrics.beforeHandle();
			}
			
			// ***********************************************************************
			// 委托给了孙类:AbstractReplyProducingMessageHandler进行处理
			// ***********************************************************************
			this.handleMessageInternal(message);
			
			if (countsEnabled) {
				handlerMetrics.afterHandle(start, true);
			}
		} catch (Exception e) {
			if (countsEnabled) {
				handlerMetrics.afterHandle(start, false);
			}
			
			if (e instanceof MessagingException) {
				throw (MessagingException) e;
			}
			throw new MessageHandlingException(message, "error occurred in message handler [" + this + "]", e);
		}
	} // end handleMessage

}

(6). AbstractReplyProducingMessageHandler.handleMessageInternal

public abstract class AbstractReplyProducingMessageHandler extends AbstractMessageProducingHandler
		implements BeanClassLoaderAware {

    protected final void handleMessageInternal(Message<?> message) {
		Object result;
		if (this.advisedRequestHandler == null) {
			result = handleRequestMessage(message);
		} else {
			result = doInvokeAdvisedRequestHandler(message);
		}
		
		if (result != null) {
			// ****************************************************************
			// 委托给了父类(AbstractMessageProducingHandler.sendOutputs)
			// ****************************************************************
			sendOutputs(result, message);
		} else if (this.requiresReply && !isAsync()) {
			throw new ReplyRequiredException(message, "No reply produced by handler '" + getComponentName() + "', and its 'requiresReply' property is set to true.");
		} else if (!isAsync() && logger.isDebugEnabled()) {
			logger.debug("handler '" + this + "' produced no reply for request Message: " + message);
		}
	} // end handleMessageInternal
}

(7). AbstractMessageProducingHandler.sendOutputs

protected void sendOutputs(Object result, Message<?> requestMessage) {
	if (result instanceof Iterable<?> && shouldSplitOutput((Iterable<?>) result)) {
		for (Object o : (Iterable<?>) result) {
			// ***********************************************************
			// 1.委托给了本地
			// ***********************************************************
			this.produceOutput(o, requestMessage);
		}
	} else if (result != null) {
		// ***********************************************************
		// 1. 委托给了本地
		// ***********************************************************
		this.produceOutput(result, requestMessage);
	}
} // end sendOutputs


// ***************************************************************************
// 往
// ***************************************************************************
protected void produceOutput(Object reply, final Message<?> requestMessage) {
	final MessageHeaders requestHeaders = requestMessage.getHeaders();

	Object replyChannel = null;
	if (getOutputChannel() == null) { // false
		Map<?, ?> routingSlipHeader = requestHeaders.get(IntegrationMessageHeaderAccessor.ROUTING_SLIP, Map.class);
		if (routingSlipHeader != null) {
			Assert.isTrue(routingSlipHeader.size() == 1, "The RoutingSlip header value must be a SingletonMap");
			Object key = routingSlipHeader.keySet().iterator().next();
			Object value = routingSlipHeader.values().iterator().next();
			Assert.isInstanceOf(List.class, key, "The RoutingSlip key must be List");
			Assert.isInstanceOf(Integer.class, value, "The RoutingSlip value must be Integer");
			List<?> routingSlip = (List<?>) key;
			AtomicInteger routingSlipIndex = new AtomicInteger((Integer) value);
			replyChannel = getOutputChannelFromRoutingSlip(reply, requestMessage, routingSlip, routingSlipIndex);
			if (replyChannel != null) {
				//TODO Migrate to the SF MessageBuilder
				AbstractIntegrationMessageBuilder<?> builder = null;
				if (reply instanceof Message) {
					builder = this.getMessageBuilderFactory().fromMessage((Message<?>) reply);
				}
				else if (reply instanceof AbstractIntegrationMessageBuilder) {
					builder = (AbstractIntegrationMessageBuilder<?>) reply;
				}
				else {
					builder = this.getMessageBuilderFactory().withPayload(reply);
				}
				builder.setHeader(IntegrationMessageHeaderAccessor.ROUTING_SLIP,
						Collections.singletonMap(routingSlip, routingSlipIndex.get()));
				reply = builder;
			}
		}

		if (replyChannel == null) {
			replyChannel = requestHeaders.getReplyChannel();
		}
	}

	if (this.async && reply instanceof ListenableFuture<?>) {
		ListenableFuture<?> future = (ListenableFuture<?>) reply;
		final Object theReplyChannel = replyChannel;
		future.addCallback(new ListenableFutureCallback<Object>() {

			@Override
			public void onSuccess(Object result) {
				Message<?> replyMessage = null;
				try {
					replyMessage = createOutputMessage(result, requestHeaders);
					sendOutput(replyMessage, theReplyChannel, false);
				}
				catch (Exception e) {
					Exception exceptionToLogAndSend = e;
					if (!(e instanceof MessagingException)) {
						exceptionToLogAndSend = new MessageHandlingException(requestMessage, e);
						if (replyMessage != null) {
							exceptionToLogAndSend = new MessagingException(replyMessage, exceptionToLogAndSend);
						}
					}
					logger.error("Failed to send async reply: " + result.toString(), exceptionToLogAndSend);
					onFailure(exceptionToLogAndSend);
				}
			}

			@Override
			public void onFailure(Throwable ex) {
				sendErrorMessage(requestMessage, ex);
			}
		});
	} else {
		// 
		sendOutput(createOutputMessage(reply, requestHeaders), replyChannel, false);
	}
} // end produceOutput

// 3. 根据channel名称(outputChannelName),从Spring容器里获得:MessageChannel对象
public MessageChannel getOutputChannel() {
	if (this.outputChannelName != null) {
		synchronized (this) {
			if (this.outputChannelName != null) {
				this.outputChannel = getChannelResolver().resolveDestination(this.outputChannelName);
				this.outputChannelName = null;
			}
		}
	}
	return this.outputChannel;
} // end getOutputChannel


// 4. 最终发送消息的方法
protected void sendOutput(Object output, Object replyChannel, boolean useArgChannel) {
	// 从Spring容器中,获得:MessageChannel
	MessageChannel outputChannel = getOutputChannel();
	if (!useArgChannel && outputChannel != null) {
		replyChannel = outputChannel;
	}
	
	if (replyChannel == null) {
		throw new DestinationResolutionException("no output-channel or replyChannel header available");
	}

	if (replyChannel instanceof MessageChannel) {
		if (output instanceof Message<?>) {
			this.messagingTemplate.send((MessageChannel) replyChannel, (Message<?>) output);
		} else {
			this.messagingTemplate.convertAndSend((MessageChannel) replyChannel, output);
		}
	} else if (replyChannel instanceof String) {
		if (output instanceof Message<?>) {
			// **********************************************************************************
			// 通过MessagingTemplate发送消息
			// **********************************************************************************
			this.messagingTemplate.send((String) replyChannel, (Message<?>) output);
		} else {
			// **********************************************************************************
			// 通过MessagingTemplate发送消息
			// **********************************************************************************
			this.messagingTemplate.convertAndSend((String) replyChannel, output);
		}
	} else {
		throw new MessagingException("replyChannel must be a MessageChannel or String");
	}
} // end sendOutput

(7). MessageHandler

其实,经过分析,我们能发现:MessageHandler承载着消息的中转过程,那么,MessageHandler的实现有哪些实现类呢?通过IDEA稍微看了下,基本大部份的业务组件,都是MessageHandler的实现(包括路由),所以,理解了一个,余下的应该能很好的理解了.

"MessageHandler"

(8). 总结

ServiceActivatingHandler的目的是被动接受消息,并把消息往MessageChannel(outputChannel)里发送.