一、Spring Cloud Stream
在企业开发中,消息中间件是重要的组件之一。消息中间件主要解决应用解耦、异步消息、流量削锋等问题,实现高性能,高可用,可伸缩和最终一致性架构。不同的中间件其实现方式,内部结构是不一样的。如常见的RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic,partitions分区,这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候 springcloud Stream 给我们提供了一种解耦合的方式。
1.1.Spring Cloud Stream概述
Spring Cloud Stream由一个中间件中立的核组成。应用通过Spring Cloud Stream插入的input(相当于消费者consumer,它是从队列中接收消息的)和output(相当于生产者producer,它是从队列中发送消息的。)通道与外界交流。通道通过指定中间件的Binder实现与外部代理连接。业务开发者不再关注具体消息中间件,只需关注Binder对应用程序提供的抽象概念来使用消息中间件实现业务即可。Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动配置,引用了发布、订阅、消费、分区的三个核心概念。官方版本目前仅仅支持RabbitMQ和Kafka。
最底层是消息服务,中间层是绑定层,绑定层和底层的消息服务进行绑定,顶层是消息生产者和消息消费者,顶层可以向绑定层生产消息和和获取消息消费
1.2.核心概念说明
- 定器
Binder 绑定器是Spring Cloud Stream中一个非常重要的概念。在没有绑定器这个概念的情况下,我们的Spring Boot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,这使得我们实现的消息交互逻辑就会非常笨重,因为对具体的中间件实现细节有太重的依赖,当中间件有较大的变动升级、或是更换中间件的时候,我们就需要付出非常大的代价来实施。通过定义绑定器作为中间层,实现了应用程序与消息中间件(Middleware)细节之间的隔离。通过向应用程序暴露统一的Channel(渠道)通过,使得应用程序不需要再考虑各种不同的消息中间件的实现。当需要升级消息中间件,或者是更换其他消息中间件产品时,我们需要做的就是更换对应的Binder绑定器而不需要修改任何应用逻辑 。甚至可以任意的改变中间件的类型而不需要修改一行代码。Spring Cloud Stream支持各种binder实现,通过配置把应用和spring cloud stream 的 binder 绑定在一起,之后我们只需要修改 binder 的配置来达到动态修改topic、exchange、type等一系列信息而不需要修改一行代码。
- 发布/订阅模型
在Spring Cloud Stream中的消息通信方式遵循了发布-订阅模式,当一条消息被投递到消息中间件之后,它会通过共享的 Topic 主题进行广播,消息消费者在订阅的主题中收到它并触发自身的业务逻辑处理。这里所提到的 Topic 主题是Spring Cloud Stream中的一个抽象概念,用来代表发布共享消息给消费者的地方。在不同的消息中间件中, Topic 可能对应着不同的概念,比如:在RabbitMQ中的它对应了Exchange、而在Kakfa中则对应了Kafka中的Topic。
1.3.MQ相关术语
- Message:生产者/消费者之间靠消息媒介传递信息内容
- MessageChannel:消息必须走特定的通道
- 消息通道的子接口SubscribableChannel,由MessageHandle消息处理器所订阅。
1.4.相关注解
- Middleware:中间件,目前只支持RabbitMQ和Kafka
- Binder:应用层和消息中间件之间的封装,实现了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型,这些可以通过配置文件修改。
- Input:表示输入通道,消息进入该通道传到应用程序。
- Output:注解标识输出通道,发布的消息将通过该通道离开应用程序。
- StreamListener:监听队列,用于消费者的队列的消息接收。
- EnableBinding:将信道channel和exchange绑定在一起。
二、案例演示环境说明
1.需要准备RabbitMQ环境
这里可以使用上一个章节的环境即可
2.创建三个子项目:
- cloud-stream-rabbitmq-provider8801,作为生产者进行消息模块
- cloud-stream-rabbitmq-consumer8802,作为消费者接受模块
- cloud-stream-rabbitmq-consumer8803,作为消费者接受模块
三、消息驱动之生产者
3.1.新建模块 cloud-stream-rabbitmq-provider8801
创建maven模块:
设置模块名:
3.2.在pom中引入依赖即可
这里直接复制:
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.augus.springcloud</groupId>
<artifactId>cloud-api-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
</dependencies>
3.3.创建配置文件
在resources目录下创建application.yml 配置文件,内容如下:
server:
port: 8801
spring:
application:
name: cloud-stream-privider
rabbitmq:
host: 192.168.93.129
port: 5672
username: root
password: root
cloud:
stream:
binders: #自此处配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称,用于binding整合
type: rabbit #消息组件类型
bindings: #服务的整合处理
output: #这个名字是一个通道的名称
destination: cruiiExchange #表示要使用的exchange名称定义 在rabbitmq中可以看到
content-type: application/json #设置消息类型,本次为json
binder: defaultRabbit #设置要绑定的消息服务的具体设置
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka/
instance:
lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔
lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔
instance-id: send-8801.com
prefer-ip-address: true #访问的路径变为IP地址
3.4.设置主启动类
主启动类 StreamMQMain8801 内容如下:
package com.augus.cloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class,args);
}
}
3.5.创建业务类
创建service 包,在下面创建 IMessageService 接口,代码如下:
package com.augus.cloud.service;
public interface IMessageService {
String send();
}
在项目impl 包,然后创建 IMessageServiceImpl 实现类:
package com.augus.cloud.service.impl;
import com.augus.cloud.service.IMessageService;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import java.util.UUID;
@EnableBinding(Source.class) //定义消息推送通道
public class IMessageServiceImpl implements IMessageService {
//消息发送的通道
private MessageChannel output;
@Override
public String send() {
//创建一个UUID,模拟发送过去的数据
String uuid = UUID.randomUUID().toString();
//发送信息
output.send(MessageBuilder.withPayload(uuid).build());
return uuid;
}
}
3.6.创建包controller
在下面创建:SendMessageController
package com.augus.cloud.controller;
import com.augus.cloud.service.IMessageService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class SendMessageController {
@Autowired
private IMessageService messageService;
@RequestMapping("/sendMessage")
public String messageSend(){
String send = messageService.send();
return send;
}
}
3.7.测试
在 浏览器访问:http://localhost:8801/sendMessage 显示如下图:
查看rabbitMQ即可看到exchange的名字:
同时看看目前的流量访问情况
四、消息驱动-消费者创建
这里的创建流程基本和生产者的module一样
4.1.创建模块
这里创建的是maven模块:
设置模块名字:
4.2.在pom.xml中引入依赖
这里赋值即可
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.augus.springcloud</groupId>
<artifactId>cloud-api-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>1.19.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
4.3.创建配置文件
在resources目录下创建 application.yml 内容如下:
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
rabbitmq:
host: 192.168.93.129
port: 5672
username: root
password: root
cloud:
stream:
binders: #自此处配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称,用于binding整合
type: rabbit #消息组件类型
bindings: #服务的整合处理
input: #这个名字是一个通道的名称
destination: cruiiExchange #表示要使用的exchange名称定义 在rabbitmq中可以看到
content-type: application/json #设置消息类型,本次为json
binder: defaultRabbit #设置要绑定的消息服务的具体设置
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka/,http://localhost:7002/eureka/
instance:
lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔
lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔
instance-id: send-8802.com
prefer-ip-address: true #访问的路径变为IP地址
4.4.创建主启动类
在 com.augusu.cloud 下创建 StreamMQMain8802 内容如下:
package com.augus.cloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class, args);
}
}
4.5.创建监听类
在 controller 下创建 ReceiveMessageListenerController ,代码如下:
package com.augus.cloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消费者1号,端口号:"+serverPort+"-----》接受到的消息是:"+message.getPayload()+"\t");
}
}
4.6.测试
重启7001、7002和8801,然后再启动8802,测试8801发送消息,8802能读取到信息:
在浏览访问:http://localhost:8801/sendMessage 可以多刷新几次,查看8801,即可看到发送的信息
8802接受到的消息会发现跟8801的消息是一样的,如下图:
五、分组消费与持久化
先需要根据8802,创建8803
5.1.创建模块8803
5.1.1创建maven项目
设置模块名:
5.1.2.在pom.xml中引入依赖
这里赋值即可
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.augus.springcloud</groupId>
<artifactId>cloud-api-common</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-server</artifactId>
<version>1.19.1</version>
<scope>compile</scope>
</dependency>
</dependencies>
5.1.3.创建配置文件
在resources目录下创建 application.yml 内容如下:
server:
port: 8803
spring:
application:
name: cloud-stream-consumer
rabbitmq:
host: 192.168.93.129
port: 5672
username: root
password: root
cloud:
stream:
binders: #自此处配置要绑定的rabbitmq的服务信息
defaultRabbit: #表示定义的名称,用于binding整合
type: rabbit #消息组件类型
bindings: #服务的整合处理
input: #这个名字是一个通道的名称
destination: cruiiExchange #表示要使用的exchange名称定义 在rabbitmq中可以看到
content-type: application/json #设置消息类型,本次为json
binder: defaultRabbit #设置要绑定的消息服务的具体设置
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka/,http://localhost:7002/eureka/
instance:
lease-expiration-duration-in-seconds: 5 #如果现在超过了5秒的间隔
lease-renewal-interval-in-seconds: 2 #设置心跳的时间间隔
instance-id: send-8803.com
prefer-ip-address: true #访问的路径变为IP地址
5.1.4.创建主启动类
在 com.augusu.cloud 下创建 StreamMQMain8802 内容如下:
package com.augus.cloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8803 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8803.class, args);
}
}
5.1.5.创建监听类
在 controller 下创建 ReceiveMessageListenerController ,代码如下:
package com.augus.cloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("消费者2号,端口号:"+serverPort+"-----》接受到的消息是:"+message.getPayload()+"\t");
}
}
5.1.6.测试
启动8803,通过访问8801的http://localhost:8801/sendMessage发送消息 如下图,8801的控制台输出:
查看8802控制台打印的信息:
查看8803控制台打印的信息:
上述案例运行后存在如下问题:
- 重复消费问题:8802/8803同时收到了消息,存在重复消费的问题,就需要通过分组和持久化属性group解决
- 消息持久化问题
5.2.分组
5.2.1.分组的原理
故障现象:重复消费
故障原因:默认分组group是不同的,组流水号不一样,被认为不同组可以消费,如下图所示
上述问题,只需要自定义分组,自定义配置分为同一个组,来解决重复消费的问题,微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。不同的组是可以消费的,同一个组内会发生竞争关系,只有其中一个可以消费。
5.2.2.设置将8802/8803都变成不同组
修改8802的aapplication.yml文件,添加分组
group: group1
如下图:
修改8803的aapplication.yml文件,添加分组
group: group2
如下图:
重启项目访问,会发现8802和8803在不同的组,依然存在重复消费的问题
5.2.3.问题解决:
8802/8803实现了轮询分组,每一次只有一个消费者。8801模块的发的消息只能被8802或者8803其中一个接受到,这样避免了重复消费,所有要8803/8802都变成相同组,group是同一个组:
修改8803的application.yml文件中的组为group1,跟8803的组保持一致:
然后重启8803,浏览器访问:http://localhost:8801/sendMessage,8801控制台出现问题如下:
查看8802.如图:
查看8803.如图:
结论:同一个组的多个微服务实例,每次只会有一个拿到
5.3.持久化
通过上述操作解决了重复消费的问题,再看看持久化,先停止8802和8803;删除掉8802的分组group:group1;8003的分组属性保留
- 浏览器访问:http://localhost:8801/sendMessage 8801先发送4条消息到rabbitmq
- 先启动8802,无分组属性配置,后天没有打出来消息
- 再启动8803,有分组属性配置,后台打出来了MQ上的消息