(1). 概述

在前面,有剖析过:ServiceActivatorParser会,向Spring中注册两个Bean(ConsumerEndpointFactoryBean/ServiceActivatorFactoryBean),在这一小篇,主要剖析:ConsumerEndpointFactoryBean.

(2). ConsumerEndpointFactoryBean

public class ConsumerEndpointFactoryBean
		//  ***************************************************************
		// 1. FactoryBean所以,需要在意:getObject
		//  ***************************************************************
		implements FactoryBean<AbstractEndpoint>, 
		           BeanFactoryAware, 
				   BeanNameAware, 
				   BeanClassLoaderAware,
				   //  ***************************************************************
				   // 2. InitializingBean所以,需要在意:afterPropertiesSet
                   //  ***************************************************************
				   InitializingBean, 
				   SmartLifecycle {
    
	@Override
	public AbstractEndpoint getObject() throws Exception {
		if (!this.initialized) {
			//  ***************************************************************
			// 3. 初始化,并且,确保只实例化一次
			//  ***************************************************************
			this.initializeEndpoint();
		}
		return this.endpoint;
	} // end getObject
	
	
	private void initializeEndpoint() throws Exception {
		synchronized (this.initializationMonitor) {
			if (this.initialized) {
				return;
			}
			
			
			MessageChannel channel = null;
			if (StringUtils.hasText(this.inputChannelName)) {
				// 根据channel名称(inputChannelName)从Spring容器中,获得:MessageChannel
				channel = this.channelResolver.resolveDestination(this.inputChannelName);
			}
			if (this.inputChannel != null) {
				channel = this.inputChannel;
			}
			
			Assert.state(channel != null, "one of inputChannelName or inputChannel is required");
			// 判断是否为:SubscribableChannel的实现类
			if (channel instanceof SubscribableChannel) {  // true
				Assert.isNull(this.pollerMetadata, "A poller should not be specified for endpoint '" + this.beanName + "', since '" + channel + "' is a SubscribableChannel (not pollable).");
				
				// *************************************************************************************
				// 创建:EventDrivenConsumer
				// 要注意:
				//    此处的:handler就是前面分析:ServiceActivator后产生的:ServiceActivatingHandler.  
				//    此处的:channel就是xml配置的inputChannel.
				// *************************************************************************************
				this.endpoint = new EventDrivenConsumer((SubscribableChannel) channel, this.handler);
				
				// 打印日志
				if (this.logger.isWarnEnabled() && Boolean.FALSE.equals(this.autoStartup) && channel instanceof FixedSubscriberChannel) {
					this.logger.warn("'autoStartup=\"false\"' has no effect when using a FixedSubscriberChannel");
				}
			} else if (channel instanceof PollableChannel) { // PollableChannel应该是poll的实现吧
				PollingConsumer pollingConsumer = new PollingConsumer((PollableChannel) channel, this.handler);
				if (this.pollerMetadata == null) {
					this.pollerMetadata = PollerMetadata.getDefaultPollerMetadata(this.beanFactory);
					Assert.notNull(this.pollerMetadata, "No poller has been defined for endpoint '" + this.beanName
							+ "', and no default poller is available within the context.");
				}
				pollingConsumer.setTaskExecutor(this.pollerMetadata.getTaskExecutor());
				pollingConsumer.setTrigger(this.pollerMetadata.getTrigger());
				pollingConsumer.setAdviceChain(this.pollerMetadata.getAdviceChain());
				pollingConsumer.setMaxMessagesPerPoll(this.pollerMetadata.getMaxMessagesPerPoll());

				pollingConsumer.setErrorHandler(this.pollerMetadata.getErrorHandler());

				pollingConsumer.setReceiveTimeout(this.pollerMetadata.getReceiveTimeout());
				pollingConsumer.setTransactionSynchronizationFactory(
						this.pollerMetadata.getTransactionSynchronizationFactory());
				pollingConsumer.setBeanClassLoader(this.beanClassLoader);
				pollingConsumer.setBeanFactory(this.beanFactory);
				this.endpoint = pollingConsumer;
			} else {
				throw new IllegalArgumentException("unsupported channel type: [" + channel.getClass() + "]");
			}
			
			this.endpoint.setBeanName(this.beanName);
			this.endpoint.setBeanFactory(this.beanFactory);
			if (this.autoStartup != null) {
				this.endpoint.setAutoStartup(this.autoStartup);
			}
			
			int phase = this.phase;
			if (!this.isPhaseSet && this.endpoint instanceof PollingConsumer) {
				phase = Integer.MAX_VALUE / 2;
			}
			
			this.endpoint.setPhase(phase);
			
			this.endpoint.afterPropertiesSet();
			this.initialized = true;
		}
	}// end initializeEndpoint

}

(3). 总结

ConsumerEndpointFactoryBean底层,又委托给了:EventDrivenConsumer类,而:EventDrivenConsumer的构造器关联着inputChannel与MessageHandler(ServiceActivator).