(1). 概述

由于SpringBoot对Kafka进行了包装,只是留出了一个注解给我们了,在这一小篇,主要剖析:@KafkaListener注解,以及了解该注解能支持哪些参数签名.

(2). 如何找到突破口?

# SpringBoot启动时,会回调:EnableAutoConfiguration配置的所有类
# spring-boot-autoconfigure.jar/META-INF/spring.factories

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.boot.autoconfigure.kafka.KafkaAutoConfiguration

(3). 如何找到突破口?

@Import({ 
	// **************************************************
	// 1. Import了另外一个配置类:KafkaAnnotationDrivenConfiguration
	// **************************************************
	KafkaAnnotationDrivenConfiguration.class,
	KafkaStreamsAnnotationDrivenConfiguration.class })
public class KafkaAutoConfiguration {
	// ... ...
}

(4). KafkaAnnotationDrivenConfiguration

@Configuration
@ConditionalOnClass(EnableKafka.class)
class KafkaAnnotationDrivenConfiguration {
	
	@Configuration
	// *******************************************************************
	// 1. 启用了:@EnableKafka注解
	// *******************************************************************
	@EnableKafka
	@ConditionalOnMissingBean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	protected static class EnableKafkaConfiguration {

	}
	
}	

(5). @EnableKafka

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
// *********************************************************
// 又导入了另外一个配置类:KafkaBootstrapConfiguration
// *********************************************************
@Import(KafkaBootstrapConfiguration.class)
public @interface EnableKafka {
	
}

(6). KafkaBootstrapConfiguration

@Configuration
public class KafkaBootstrapConfiguration {

	// **************************************************************************
	// KafkaListenerAnnotationBeanPostProcessor
	// **************************************************************************
	@Bean(name = KafkaListenerConfigUtils.KAFKA_LISTENER_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationProcessor() {
		return new KafkaListenerAnnotationBeanPostProcessor();
	}

(7). KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization

通过剖析源码得知,postProcessAfterInitialization方法主要是把注解收集,并转化在业务模型:MethodKafkaListenerEndpoint,并把所有的结果保存在:KafkaListenerEndpointRegistrar里.
那么:底层又是如何收到消息后,把消息回调给:MethodKafkaListenerEndpoint呢?

public class KafkaListenerAnnotationBeanPostProcessor<K, V>
        // ***********************************************************
		// BeanPostProcessor有两个回调函数,Spring每次new出对象后,都会回调给:BeanPostProcessor
		// 所以,我们的重点是关注:BeanPostProcessor.postProcessAfterInitialization,因为这里是AOP织入的核心哈!
		// ***********************************************************
		implements BeanPostProcessor, 
		           Ordered, 
				   // BeanFactoryAware的作用,要求Spring注入:BeanFactory
				   BeanFactoryAware, 
				   SmartInitializingSingleton {

	
	// *************************************************************
	// 1. Spring核心回调方法
	// *************************************************************
	@Override
	public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
		// 2. 因为,每一个Bean都会回调这个方法,所以,做了一个去重,避免重复反射获得注解信息,而又空无一获.
		if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
			// 3. 获得Bean对应的Class信息
			Class<?> targetClass = AopUtils.getTargetClass(bean);
			
			// 4. 查找类级别上的注解(@KafkaListener/@KafkaListeners)
			Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
			final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
			
			// 5. 查找出方法级别上的注解(@KafkaListener/@KafkaListeners)
			final List<Method> multiMethods = new ArrayList<Method>();
			Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
				new MethodIntrospector.MetadataLookup<Set<KafkaListener>>() {

					@Override
					public Set<KafkaListener> inspect(Method method) {
						Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
						return (!listenerMethods.isEmpty() ? listenerMethods : null);
					}

			}); // end annotatedMethods
			
			// 6. 过滤出:方法级别上,拥有注解(@KafkaHandler)的方法.
			if (hasClassLevelListeners) {
				Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
						(ReflectionUtils.MethodFilter) method ->
								AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
				multiMethods.addAll(methodsWithHandler);
			}
			
			// 7. 当方法上没有注解时,通过集合(annotatedMethods)记录,防止下次再对这个对象进行解析.
			if (annotatedMethods.isEmpty()) {
				this.nonAnnotatedClasses.add(bean.getClass());
				if (this.logger.isTraceEnabled()) {
					this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());
				}
			}
			else {
				// *******************************************************************
				// 8. 对方法级别上的注解进行处理
				// *******************************************************************
				// Non-empty set of methods
				for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
					// key = Method
					// value = @KafkaListener
					Method method = entry.getKey();
					for (KafkaListener listener : entry.getValue()) {
						// ************************************************************
						// 9. 处理:@KafkaListener
						// ************************************************************
						processKafkaListener(listener, method, bean, beanName);
					}
				}
				
				if (this.logger.isDebugEnabled()) {
					this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
							+ beanName + "': " + annotatedMethods);
				}
			}
			
			// 9. 对类级别上的注解进行处理.
			if (hasClassLevelListeners) {
				processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
			}
		}
		return bean;
	} // end postProcessAfterInitialization

    // 获取方法上的注解(@KafkaListener/@KafkaListeners)
	// 从代码上能看出来,是支持配置多个消费者的.
	private Collection<KafkaListener> findListenerAnnotations(Class<?> clazz) {
		Set<KafkaListener> listeners = new HashSet<KafkaListener>();
		KafkaListener ann = AnnotationUtils.findAnnotation(clazz, KafkaListener.class);
		if (ann != null) {
			listeners.add(ann);
		}
		KafkaListeners anns = AnnotationUtils.findAnnotation(clazz, KafkaListeners.class);
		if (anns != null) {
			listeners.addAll(Arrays.asList(anns.value()));
		}
		return listeners;
	} // end findListenerAnnotations
	
	// **************************************************************
	// 9.1 处理@KafkaListener
	// **************************************************************
	protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
		Method methodToUse = checkProxy(method, bean);
		
		// 开始,解析注解:@KafkaListener,把解析的结果通过:MethodKafkaListenerEndpoint包裹着:Method/BeanFactory/KafkaListenerErrorHandler
		MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<K, V>();
		endpoint.setMethod(methodToUse);
		endpoint.setBeanFactory(this.beanFactory);
		
		// 解析注解(@KafkaListener(errorHandler="${errorHandler}"))上的错误处理定义.
		String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
		if (StringUtils.hasText(errorHandlerBeanName)) {
			endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
		}
		
		// ***************************************************************
		// 9.2 委托给另一个方法,处理@KafkaListener
		// ***************************************************************
		processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
	} // end processKafkaListener
	
	// ***************************************************************
	// 9.3 processListener
	// ***************************************************************
	protected void processListener(
	             MethodKafkaListenerEndpoint<?, ?> endpoint, 
				 KafkaListener kafkaListener, 
				 Object bean,
				Object adminTarget, 
				String beanName) {
		
		
		String beanRef = kafkaListener.beanRef();
		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.addListener(beanRef, bean);
		}
		
		// ********************************************************************
		// MethodKafkaListenerEndpoint承载着解析@KafkaListener后的结果,即:KafkaListener对应的业务模型是:MethodKafkaListenerEndpoint
		// ********************************************************************
		endpoint.setBean(bean);
		endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
		endpoint.setId(getEndpointId(kafkaListener));
		// 解析groupid,支持EL表达式
		endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
		// 解析topics,仅topic支持EL表达式
		endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
		// 解析topic,支持EL表达式
		endpoint.setTopics(resolveTopics(kafkaListener));
		// 支持EL表达式
		endpoint.setTopicPattern(resolvePattern(kafkaListener));
		endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(),
				"clientIdPrefix"));
		String group = kafkaListener.containerGroup();
		if (StringUtils.hasText(group)) {
			Object resolvedGroup = resolveExpression(group);
			if (resolvedGroup instanceof String) {
				endpoint.setGroup((String) resolvedGroup);
			}
		}
		
		// ******************************************************
		// 解析消费者组对应的:并发数,建议要与partition数量相同.
		// 这就,解释通了,为什么在注解上,concurrency是字符串类型,而不是直接数值类型,就是为了支持动态化.
		// ******************************************************
		String concurrency = kafkaListener.concurrency();
		if (StringUtils.hasText(concurrency)) {
			endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
		}
		
		
		String autoStartup = kafkaListener.autoStartup();
		if (StringUtils.hasText(autoStartup)) {
			endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
		}

		// 解析containerFactory(KafkaListenerContainerFactory)
		KafkaListenerContainerFactory<?> factory = null;
		String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
		if (StringUtils.hasText(containerFactoryBeanName)) {
			Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
			try {
				factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
			}
			catch (NoSuchBeanDefinitionException ex) {
				throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
						+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
						+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
			}
		}
		
		// ****************************************************************
		// 仅仅是把@KafkaListener注解的信息,转换成了业务模型对象:MethodKafkaListenerEndpoint,并保存着.
		// ****************************************************************
		this.registrar.registerEndpoint(endpoint, factory);
		if (StringUtils.hasText(beanRef)) {
			this.listenerScope.removeListener(beanRef);
		}
	} // end processListener
}

(8). KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated

// afterSingletonsInstantiated是Spring回调
public void afterSingletonsInstantiated() {
	
	this.registrar.setBeanFactory(this.beanFactory);
	
	
	// 处理Spring容器中所有的:KafkaListenerConfigurer类的实现类.
	if (this.beanFactory instanceof ListableBeanFactory) {
		Map<String, KafkaListenerConfigurer> instances =
				((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
		for (KafkaListenerConfigurer configurer : instances.values()) {
			configurer.configureKafkaListeners(this.registrar);
		}
	}

	// ... ...
	// ********************************************************************
	// 上一个方法,仅仅是把注解转化成了业务模型,然后保存在:registrar
	// 在这个方法里就能看到:会调用registrar.afterPropertiesSet进行初始化.
	// ********************************************************************
	// Actually register all listeners
	this.registrar.afterPropertiesSet();
}

(9). KafkaListenerEndpointRegistrar.afterPropertiesSet

public class KafkaListenerEndpointRegistrar implements BeanFactoryAware, InitializingBean {
	
	public void afterPropertiesSet() {
		// ****************************************
		// 委托给了另一个方法:registerAllEndpoints
		// ****************************************
		registerAllEndpoints();
	}
		
	protected void registerAllEndpoints() {
		// endpointDescriptors包裹着:KafkaListenerEndpoint
		synchronized (this.endpointDescriptors) {
			for (KafkaListenerEndpointDescriptor descriptor : this.endpointDescriptors) {
				// ************************************************************
				// 又委托给了:endpointRegistry.registerListenerContainer方法
				// ************************************************************
				this.endpointRegistry.registerListenerContainer(descriptor.endpoint, resolveContainerFactory(descriptor));
			}
			// 标记初始化完成
			this.startImmediately = true;  // trigger immediate startup
		}
	}
	
}	

(10). KafkaListenerEndpointRegistry.registerListenerContainer

public class KafkaListenerEndpointRegistry 
       implements DisposableBean, SmartLifecycle, ApplicationContextAware, ApplicationListener<ContextRefreshedEvent> {
    
    // 通过KafkaListenerContainerFactory创建监听处理.
	public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
		registerListenerContainer(endpoint, factory, false);
	} // end registerListenerContainer
	
	public void registerListenerContainer(
	          KafkaListenerEndpoint endpoint, 
			  KafkaListenerContainerFactory<?> factory,
			  // false
			  boolean startImmediately) {
		Assert.notNull(endpoint, "Endpoint must not be null");
		Assert.notNull(factory, "Factory must not be null");

		String id = endpoint.getId();
		Assert.hasText(id, "Endpoint id must not be empty");
		synchronized (this.listenerContainers) {
			Assert.state(!this.listenerContainers.containsKey(id),
					"Another endpoint is already registered with id '" + id + "'");
            // *******************************************************************
			// 这一步很重要:把业务模型(KafkaListenerEndpoint)转换成:MessageListenerContainer对象
			// MessageListenerContainer对象是:org.springframework.context.SmartLifecycle的实现类.
			// 所以,只要把MessageListenerContainer向Spring中注册,就会自动回调:
			// SmartLifecycle.start方法,所以,只需要关注:MessageListenerContainer.start方法即可
			// *******************************************************************
			MessageListenerContainer container = createListenerContainer(endpoint, factory);
			this.listenerContainers.put(id, container);
			if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
				List<MessageListenerContainer> containerGroup;
				if (this.applicationContext.containsBean(endpoint.getGroup())) {
					containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
				} else {
					containerGroup = new ArrayList<MessageListenerContainer>();
					// *********************************************************************************
					// 向Spring容器中注册一个Bean
					// *********************************************************************************
					this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
				}
				containerGroup.add(container);
			}
			if (startImmediately) {
				startIfNecessary(container);
			}
		}
	} // end registerListenerContainer
}			

(11). 总结

# 1.  KafkaListenerAnnotationBeanPostProcessor.postProcessAfterInitialization方法,负责收集所有的注解(@KafkaListener),并转化成临时业务模型对象:KafkaListenerEndpoint,并把结果保存在:KafkaListenerEndpointRegistrar对象里.
# 2.  KafkaListenerAnnotationBeanPostProcessor.afterSingletonsInstantiated方法,会触发:KafkaListenerEndpointRegistrar.afterPropertiesSet对所有的业务模型进行处理.
# 3.  KafkaListenerEndpointRegistrar.afterPropertiesSet方法,遍历所有的业务模型:KafkaListenerEndpoint,委托给:KafkaListenerEndpointRegistry.registerListenerContainer方法.  
# 4.  KafkaListenerEndpointRegistry.registerListenerContainer会把:KafkaListenerEndpoint转换成:MessageListenerContainer.
# 5.  MessageListenerContainer 是一个接口,实现类为:org.springframework.kafka.listener.ConcurrentMessageListenerContainer
# 同时,MessageListenerContainer属于:org.springframework.context.SmartLifecycle的实现类,所以,把MessageListenerContainer向Spring容器
# 注册即可,Spring容器会回调MessageListenerContainer.start()方法,监听Kafka事件.