(1). 概述

在这一小节,主要学习Spring Cloud Stream与Kafka的整合,后面会去剖析业务模型和部份源码.

(2). 项目结构如下

生产者负责生产消息,消费者负责消费消息.

lixin-macbook:Workspace lixin$ tree spring-cloud-stream-kafka-parent
spring-cloud-stream-kafka-parent
├── pom.xml
├── spring-cloud-stream-kafka-consumer                 # 消费者消费者
│   ├── pom.xml
│   ├── src
│   │   ├── main
│   │   │   ├── java
│   │   │   │   └── help
│   │   │   │       └── lixin
│   │   │   │           └── example
│   │   │   │               └── stream
│   │   │   │                   ├── ConsumerApplication.java
│   │   │   │                   └── service
│   │   │   │                       └── RecieveService.java
│   │   │   └── resources
│   │   │       └── application.properties
│   │   └── test
│   │       └── java
│   └── target
├── spring-cloud-stream-kafka-provider               # 消息生产者
│   ├── pom.xml
│   ├── src
│   │   ├── main
│   │   │   ├── java
│   │   │   │   └── help
│   │   │   │       └── lixin
│   │   │   │           └── example
│   │   │   │               └── stream
│   │   │   │                   ├── ProviderApplication.java
│   │   │   │                   ├── controller
│   │   │   │                   │   └── ProducerController.java
│   │   │   │                   └── service
│   │   │   │                       └── SendService.java
│   │   │   └── resources
│   │   │       └── application.properties
│   │   └── test
│   │       └── java
│   └── target
└── src
    ├── main
    │   ├── java
    │   └── resources
    └── test
        └── java

(3). 引入依赖

<dependencies>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-web</artifactId>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-test</artifactId>
		<scope>test</scope>
	</dependency>
	<dependency>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-actuator</artifactId>
	</dependency>
	
    <!-- stream -->
	<dependency>
		<groupId>org.springframework.cloud</groupId>
		<artifactId>spring-cloud-stream-binder-kafka</artifactId>
		<exclusions>
			<exclusion>
				<groupId>org.apache.kafka</groupId>
				<artifactId>kafka-clients</artifactId>
			</exclusion>
		</exclusions>
	</dependency>
	<dependency>
		<groupId>org.apache.kafka</groupId>
		<artifactId>kafka-clients</artifactId>
		<version>2.8.1</version>
	</dependency>
</dependencies>

(4). SendService

package help.lixin.example.stream.service;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.support.MessageBuilder;

@EnableBinding(Source.class)
public class SendService {

	@Autowired
	private Source source;

	public void sendMsg(String msg) {
		source.output().send(MessageBuilder.withPayload(msg).build());
	}
}

(5). ProducerController

package help.lixin.example.stream.controller;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import help.lixin.example.stream.service.SendService;

@RestController
public class ProducerController {

	@Autowired
	private SendService sendService;

	@RequestMapping("/send/{msg}")
	public void send(@PathVariable("msg") String msg) {
		sendService.sendMsg(msg);
	}
}

(6). application.properties

server.port=6060
spring.application.name=stream-provider

# kafka定义
spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
spring.cloud.stream.kafka.binder.zk-nodes=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
spring.cloud.stream.kafka.binder.auto-create-topics=true
spring.cloud.stream.kafka.binder.required-acks=-1

# 定义channel(output)与topic的关系.
spring.cloud.stream.bindings.output.destination=hello-world
spring.cloud.stream.bindings.output.content-type=text/plain

(7). ProviderApplication

package help.lixin.example.stream;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ProviderApplication {
	public static void main(String[] args) {
		SpringApplication.run(ProviderApplication.class, args);
	}
}

(7). RecieveService

package help.lixin.example.stream.service;

import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;

@EnableBinding(Sink.class)
public class RecieveService {

	@StreamListener(Sink.INPUT)
	public void recieve(String payload) {
		System.out.println(payload);
	}
}

(8). application.properties

server.port=6061
spring.application.name=stream-consumer

spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
spring.cloud.stream.kafka.binder.auto-create-topics=true

spring.cloud.stream.bindings.input.destination=hello-world
#spring.cloud.stream.kafka.bindings.input.group=${spring.application.name}

(9). ConsumerApplication

package help.lixin.example.stream;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ConsumerApplication {
	public static void main(String[] args) {
		SpringApplication.run(ConsumerApplication.class, args);
	}
}

(10). 查看kafka所有topic

lixin-macbook:bin lixin$ ./kafka-topics.sh  --zookeeper 127.0.0.1:2181  --list
__consumer_offsets
hello-world

(11). 测试运行

lixin-macbook:~ lixin$ curl http://localhost:6060/send/hello
lixin-macbook:~ lixin$ curl http://localhost:6060/send/world

(12). 总结

通过Spring Cloud Stream之后,我们可以做到在代码级别无缝的切换MQ厂商.