SpringCloud之Spring Cloud Stream:消息驱动,
Spring Cloud Stream 是一个构建消息驱动微服务的框架,该框架在Spring Boot的基础上整合了Spring Integrationg来连接消息代理中间件(RabbitMQ, Kafka等),提供了个性化的自动化配置实现,并引入了发布-订阅、消费组、分区这三个核心概念。
应用程序通过input通道或者output通道来与Spring Cloud Stream中binder(绑定器)交互,通过配置来binding. 而Spring Cloud Stream的binder负责与中间件交互。
开发工具:IntelliJ IDEA 2019.2.3
一、服务器端
1、创建项目
IDEA中创建一个新的SpringBoot项目,名称为“spring-server”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Spring Cloud Discovery -> Eureka Server。
pom.xml完整内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0
</modelVersion>
<parent>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-parent
</artifactId>
<version>2.1.10.RELEASE
</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example
</groupId>
<artifactId>spring-server
</artifactId>
<version>0.0.1-SNAPSHOT
</version>
<name>spring-server
</name>
<description>Demo project for Spring Boot
</description>
<properties>
<java.version>1.8
</java.version>
<spring-cloud.version>Greenwich.SR4
</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-server
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-test
</artifactId>
<scope>test
</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-dependencies
</artifactId>
<version>${spring-cloud.version}
</version>
<type>pom
</type>
<scope>import
</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-maven-plugin
</artifactId>
</plugin>
</plugins>
</build>
</project>
View Code
2、修改配置application.yml
修改端口号为8761;取消将自己信息注册到Eureka服务器,不从Eureka服务器抓取注册信息。
server:
port: 8761
eureka:
client:
register-with-eureka: false
fetch-registry: false
3、修改启动类代码
增加注解@EnableEurekaServer

package com.example.springserver;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;
@SpringBootApplication
@EnableEurekaServer
public class SpringServerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringServerApplication.class, args);
}
}
View Code
二、消息生产者
1、创建项目
IDEA中创建一个新的SpringBoot项目,名称为“spring-producer”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打开pom.xml,添加依赖spring-cloud-starter-stream-rabbit,会自动引入spring-cloud-stream和spring-cloud-stream-binder。
pom.xml完整内容如下:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0
</modelVersion>
<parent>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-parent
</artifactId>
<version>2.1.10.RELEASE
</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example
</groupId>
<artifactId>spring-producer
</artifactId>
<version>0.0.1-SNAPSHOT
</version>
<name>spring-producer
</name>
<description>Demo project for Spring Boot
</description>
<properties>
<java.version>1.8
</java.version>
<spring-cloud.version>Greenwich.SR4
</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-web
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-starter-stream-rabbit
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-test
</artifactId>
<scope>test
</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-stream-test-support
</artifactId>
<scope>test
</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-dependencies
</artifactId>
<version>${spring-cloud.version}
</version>
<type>pom
</type>
<scope>import
</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-maven-plugin
</artifactId>
</plugin>
</plugins>
</build>
</project>
View Code
2、修改配置application.yml
pom.xml使用RabbitMQ,默认情况下,连接本地的5672端口。下面这段rabbitmq也可省略。
server:
port: 8081
spring:
application:
name: spring-producer
eureka:
instance:
hostname: localhost
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
rabbitmq:
host: localhost
post: 5672
username: guest
password: guest
3、编写发送服务
方法sendOrder使用@Output("myInput")注解表示创建myInput的消息通道。调用该方法后,会向myInput通道投递消息。
如果不使用参数myInput,则使用方法名作为通道名称。
package com.example.springproducer;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.SubscribableChannel;
public interface SendService {
@Output("myInput")
SubscribableChannel sendOrder();
}
4、修改启动类代码
加入注解@EnableBinding以开启Spring容器的绑定功能,以SendService.class为参数,Spring容器启动时,会自动绑定SendService接口中定义的通道。
package com.example.springproducer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.stream.annotation.EnableBinding;
@SpringBootApplication
@EnableEurekaClient
@EnableBinding(SendService.class)
public class SpringProducerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringProducerApplication.class, args);
}
}
5、添加一个控制器类
调用SendService的发送方法,往服务器发送消息。
package com.example.springproducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.messaging.Message;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class ProducerController {
@Autowired
SendService sendService;
@RequestMapping(value="/send",method= RequestMethod.GET)
public String sendRequest(){
//创建消息
Message msg = MessageBuilder.withPayload("hello world".getBytes()).build();
//发送消息
sendService.sendOrder().send(msg);
return "SUCCESS";
}
}
三、消息消费者
1、创建项目
IDEA中创建一个新的SpringBoot项目,名称为“spring-consumer”,SpringBoot版本选择2.1.10,在选择Dependencies(依赖)的界面勾选Web -> Spring Web,Spring Cloud Discovery -> Eureka Discovery Client。
打开pom.xml,添加依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0
</modelVersion>
<parent>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-parent
</artifactId>
<version>2.1.10.RELEASE
</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example
</groupId>
<artifactId>spring-consumer
</artifactId>
<version>0.0.1-SNAPSHOT
</version>
<name>spring-consumer
</name>
<description>Demo project for Spring Boot
</description>
<properties>
<java.version>1.8
</java.version>
<spring-cloud.version>Greenwich.SR4
</spring-cloud.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-web
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-starter-stream-rabbit
</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-starter-test
</artifactId>
<scope>test
</scope>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud
</groupId>
<artifactId>spring-cloud-dependencies
</artifactId>
<version>${spring-cloud.version}
</version>
<type>pom
</type>
<scope>import
</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot
</groupId>
<artifactId>spring-boot-maven-plugin
</artifactId>
</plugin>
</plugins>
</build>
</project>
View Code
2、修改配置application.yml
server:
port: 8080
spring:
application:
name: spring-consumer
eureka:
instance:
hostname: localhost
client:
serviceUrl:
defaultZone: http://localhost:8761/eureka/
rabbitmq:
host: localhost
post: 5672
username: guest
password: guest
3、缩写接受消息的通道接口
package com.example.springconsumer;
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
public interface ReceiveService {
@Input("myInput")
SubscribableChannel myInput();
}
4、修改启动类代码
同样绑定消息通道
package com.example.springconsumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
@SpringBootApplication
@EnableBinding(ReceiveService.class)
public class SpringConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(SpringConsumerApplication.class, args);
}
//订阅myInput通道的消息
@StreamListener("myInput")
public void receive(byte[] msg){
System.out.println("接收到的消息:" + new String(msg));
}
}
5、测试
(1)检查服务里面的RabbitMQ是否有启动(默认启动);
(2)启动spring-server(8761端口);
(3)启动spring-producer(8081端口);
(4)启动spring-consumer(8080端口);
(5)浏览器访问http://localhost:8081/send,spring-consumer项目的控制台输出:
接收到的消息:hello world
说明消费者已经可以从消息代理中获取到消息。
四、更换绑定器
上面使用了RabbitMQ作为消息代理,如果使用Kafka,可以更换Maven依赖实现。
在生产者和消费者的pom.xml中,将spring-cloud-starter-stream-rabbit修改为spring-cloud-starter-stream-kafka。
用户点评