(1). 概述

前面把SpringBoot与Kafka进行了集成,配置都是从官网去拉下来的,但是,为了能够深入到底有哪些配置项是可以配的,需要找到,Kafka的业务模型.

(2). KafkaProperties

@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
	
	// kafka服务器列表,可配置多个
	private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
	
	
	private String clientId;

	private final Map<String, String> properties = new HashMap<>();
	
	// 消费者配置
	private final Consumer consumer = new Consumer();

	//  生产者配置
	private final Producer producer = new Producer();

	// admin配置
	private final Admin admin = new Admin();

	// 流配置
	private final Streams streams = new Streams();

	// 监听器配置
	private final Listener listener = new Listener();
	
	// SSL
	private final Ssl ssl = new Ssl();

	private final Jaas jaas = new Jaas();
	
	
	private final Template template = new Template();
}

(3). Consumer

public static class Consumer {

	private final Ssl ssl = new Ssl();

	/**
	 * Frequency with which the consumer offsets are auto-committed to Kafka if
	 * 'enable.auto.commit' is set to true.
	 */
	private Duration autoCommitInterval;

	/**
	 * 
	 * What to do when there is no initial offset in Kafka or if the current offset no
	 * longer exists on the server.
	 */
	private String autoOffsetReset;

	/**
	 * Comma-delimited list of host:port pairs to use for establishing the initial
	 * connections to the Kafka cluster. Overrides the global property, for consumers.
	 */
	private List<String> bootstrapServers;

	/**
	 * ID to pass to the server when making requests. Used for server-side logging.
	 */
	private String clientId;

	/**
	 * 启动用自动提交offset
	 * Whether the consumer's offset is periodically committed in the background.
	 */
	private Boolean enableAutoCommit;

	/**
	 * Maximum amount of time the server blocks before answering the fetch request if
	 * there isn't sufficient data to immediately satisfy the requirement given by
	 * "fetch-min-size".
	 */
	private Duration fetchMaxWait;

	/**
	 * 
	 * Minimum amount of data the server should return for a fetch request.
	 */
	private DataSize fetchMinSize;

	/**
	 * 消费者组id
	 * Unique string that identifies the consumer group to which this consumer
	 * belongs.
	 */
	private String groupId;

	/**
	 * Expected time between heartbeats to the consumer coordinator.
	 */
	private Duration heartbeatInterval;

	/**
	 * Deserializer class for keys.
	 */
	private Class<?> keyDeserializer = StringDeserializer.class;

	/**
	 * Deserializer class for values.
	 */
	private Class<?> valueDeserializer = StringDeserializer.class;

	/**
	 * 最大poll的条数
	 * Maximum number of records returned in a single call to poll().
	 */
	private Integer maxPollRecords;

	/**
	 * Additional consumer-specific properties used to configure the client.
	 */
	private final Map<String, String> properties = new HashMap<>();
}		

(4). Producer

public static class Producer {

	private final Ssl ssl = new Ssl();

	/**
	 * 生产者ack
	 * Number of acknowledgments the producer requires the leader to have received
	 * before considering a request complete.
	 */
	private String acks;

	/**
	 * Default batch size. A small batch size will make batching less common and may
	 * reduce throughput (a batch size of zero disables batching entirely).
	 */
	private DataSize batchSize;

	/**
	 * Comma-delimited list of host:port pairs to use for establishing the initial
	 * connections to the Kafka cluster. Overrides the global property, for producers.
	 */
	private List<String> bootstrapServers;

	/**
	 * 生产者缓冲区大小
	 * Total memory size the producer can use to buffer records waiting to be sent to
	 * the server.
	 */
	private DataSize bufferMemory;

	/**
	 * 
	 * ID to pass to the server when making requests. Used for server-side logging.
	 */
	private String clientId;

	/**
	 * 压缩类型?
	 * Compression type for all data generated by the producer.
	 */
	private String compressionType;

	/**
	 * Serializer class for keys.
	 */
	private Class<?> keySerializer = StringSerializer.class;

	/**
	 * Serializer class for values.
	 */
	private Class<?> valueSerializer = StringSerializer.class;

	/**
	 * 重试次数
	 * When greater than zero, enables retrying of failed sends.
	 */
	private Integer retries;

	/**
	 * 
	 * When non empty, enables transaction support for producer.
	 */
	private String transactionIdPrefix;

	/**
	 * Additional producer-specific properties used to configure the client.
	 */
	private final Map<String, String> properties = new HashMap<>();
}		

(5).

public static class Listener {

	public enum Type {

		/**
		 * Invokes the endpoint with one ConsumerRecord at a time.
		 */
		SINGLE,

		/**
		 * Invokes the endpoint with a batch of ConsumerRecords.
		 */
		BATCH

	}

	/**
	 * 配置监听的类型(ConsumerRecord/ConsumerRecords)
	 * Listener type.
	 */
	private Type type = Type.SINGLE;

	/**
	 * 
	 * Listener AckMode. See the spring-kafka documentation.
	 */
	private AckMode ackMode;

	/**
	 * Prefix for the listener's consumer client.id property.
	 */
	private String clientId;

	/**
	 * Number of threads to run in the listener containers.
	 */
	private Integer concurrency;

	/**
	 * Timeout to use when polling the consumer.
	 */
	private Duration pollTimeout;

	/**
	 * Multiplier applied to "pollTimeout" to determine if a consumer is
	 * non-responsive.
	 */
	private Float noPollThreshold;

	/**
	 * Number of records between offset commits when ackMode is "COUNT" or
	 * "COUNT_TIME".
	 */
	private Integer ackCount;

	/**
	 * Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
	 */
	private Duration ackTime;

	/**
	 * Time between publishing idle consumer events (no data received).
	 */
	private Duration idleEventInterval;

	/**
	 * Time between checks for non-responsive consumers. If a duration suffix is not
	 * specified, seconds will be used.
	 */
	@DurationUnit(ChronoUnit.SECONDS)
	private Duration monitorInterval;

	/**
	 * Whether to log the container configuration during initialization (INFO level).
	 */
	private Boolean logContainerConfig;
}

(6). 总结

通过源码,我们能更加清楚知道有哪些配置项是可以配置的,后面会对注解@KafkaListener进行深入剖析.