简介:
在springcloud-stream出现之前,我们在工作的过程中,需要根据项目的技术选型使用不同的基于AMQP的消息中间件,譬如常用的RabbitMQ、RocketMQ、Kafka等等,学习成本比较的高,能不能有类似与Hibernate这样子的框架,来统一数据访问,从而不需要过多的掌握底层的实现。在这样子大的背景下面springcloud-stream诞生了。主要用来统一底层消息中间件的差异,从而降低不同中间件之间切换的成本。
架构图:
标准流程:
编码API和注解:
关键代码如下:
消息发送方的关键代码:
父工程的坐标
<packaging>pom</packaging>
<description>父工程进行坐标的统一的管理</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<junit.version>4.12</junit.version>
<lombok.version>1.18.12</lombok.version>
<mysql.version>5.1.38</mysql.version>
<druid.version>1.1.23</druid.version>
<mybatis-spring-boot.version>2.1.3</mybatis-spring-boot.version>
<spring-boot.version>2.3.2.RELEASE</spring-boot.version>
<spring-cloud.version>Hoxton.SR9</spring-cloud.version>
<spring-cloud-alibaba.version>2.2.6.RELEASE</spring-cloud-alibaba.version>
</properties>
<dependencyManagement>
<dependencies>
<!--配置springboot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--springcloud-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring-cloud.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--springcloud alibaba-->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-alibaba-dependencies</artifactId>
<version>${spring-cloud-alibaba.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
</dependency>
<!--这是mysql-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>${mysql.version}</version>
</dependency>
<!--这是druid-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>${druid.version}</version>
</dependency>
<!--这是mybatis-->
<dependency>
<groupId>org.mybatis.spring.boot</groupId>
<artifactId>mybatis-spring-boot-starter</artifactId>
<version>${mybatis-spring-boot.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<finalName>microservicecloud2022</finalName>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork>
<addResources>true</addResources>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<configuration>
<delimiters>
<delimit>@</delimit>
</delimiters>
</configuration>
</plugin>
</plugins>
</build>
</project>
结构如下:
- 工程的名字
cloud-stream-rabbitmq-provider8801
- 工程的pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>microservicecloud2022</artifactId>
<groupId>com.edu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
<description>这是消息的生成者</description>
<dependencies>
<!--引入web模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--引入actuator模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--引入springboot的测试模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--eureka客户端-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<!-- 消息总线-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--添加热部署-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
</dependencies>
</project>
- 工程的yml文件
server:
port: 8801
spring:
application:
name: cloud-stream-rabbitmq-provider
cloud:
stream:
binders: ## 此处配置要绑定的rabbitmq的信息
defaultRabbit: # 表示定义的名称,用于binding整合
type: rabbit #消息组件类型
environment: #设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 120.77.176.159
port: 5673
username: guest
password: guest
bindings: # 服务的整合处理
output: #这个名字是一个通道的名称
destination: studyExchange #表示要使用的exchange的通道的名称
content-type: application/json # 设置消息的类型
binder: defaultRabbit #设置要绑定的消息服务的具体设置(红色可以不用管)
eureka:
client:
service-url: #注册中心的地址
#defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/,http://eureka7003.com:7003/eureka/
defaultZone: http://eureka7001.com:7001/eureka/
instance:
instance-id: cloud-stream-rabbitmq-provider8801
prefer-ip-address: true
lease-expiration-duration-in-seconds: 5
lease-renewal-interval-in-seconds: 2
info:
app.name: microservicecloud2022
company.name: www.edu.com
build.artifactId: "@project.artifactId@"
build.version: "@project.version@"
- service接口
package com.edu.springcloud.service;
public interface IMessageProvider {
public String send(String message) ;
}
- service的实现
package com.edu.springcloud.service.impl;
import com.edu.springcloud.service.IMessageProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.support.MessageBuilder;
import java.util.UUID;
@EnableBinding(Source.class)
public class MessageProviderImpl implements IMessageProvider {
@Autowired
private MessageChannel output ;
@Override
public String send(String message) {
String uuid = UUID.randomUUID().toString() ;
System.out.println("*********************:"+message+":"+uuid);
output.send(MessageBuilder.withPayload(message+":"+uuid).build()) ;
return message;
}
}
- 控制器
package com.edu.springcloud.controller;
import com.edu.springcloud.service.IMessageProvider;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class MessageProviderController {
@Autowired
private IMessageProvider messageProvider ;
@GetMapping("/sendMessage")
public String sendMessage(String message) {
return messageProvider.send(message) ;
}
}
- 主启动类
package com.edu.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.math.BigInteger;
@SpringBootApplication
public class MessageProviderMain8801_APP {
public static void main(String[] args) {
SpringApplication.run(MessageProviderMain8801_APP.class,args) ;
}
}
消息消费方的关键代码:
结构如下:
- 工程的名字
cloud-stream-rabbitmq-consumer8802
- 工程pom文件
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>microservicecloud2022</artifactId>
<groupId>com.edu</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
<description>这是消息的消费者</description>
<dependencies>
<!--引入web模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--引入actuator模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<!--引入springboot的测试模块-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<!--eureka客户端-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-config</artifactId>
</dependency>
<!-- 消息总线-->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!--添加热部署-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
</dependency>
</dependencies>
</project>
- 工程的yaml文件
server:
port: 8802
spring:
application:
name: cloud-stream-rabbitmq-consumer
cloud:
stream:
binders: ## 此处配置要绑定的rabbitmq的信息
defaultRabbit: # 表示定义的名称,用于binding整合
type: rabbit #消息组件类型
environment: #设置rabbitmq的相关的环境配置
spring:
rabbitmq:
host: 120.77.176.159
port: 5673
username: guest
password: guest
bindings: # 服务的整合处理
input: #这个名字是一个通道的名称
destination: studyExchange #表示要使用的exchange的通道的名称
content-type: application/json # 设置消息的类型
binder: defaultRabbit #设置要绑定的消息服务的具体设置(红色可以不用管)
group: rabbitmqA
eureka:
client:
service-url: #注册中心的地址
#defaultZone: http://eureka7001.com:7001/eureka/,http://eureka7002.com:7002/eureka/,http://eureka7003.com:7003/eureka/
defaultZone: http://eureka7001.com:7001/eureka/
instance:
instance-id: cloud-stream-rabbitmq-consumer8802
prefer-ip-address: true
lease-expiration-duration-in-seconds: 5
lease-renewal-interval-in-seconds: 2
info:
app.name: microservicecloud2022
company.name: www.edu.com
build.artifactId: "@project.artifactId@"
build.version: "@project.version@"
- 控制器
package com.edu.springcloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.web.bind.annotation.RestController;
@RestController
@EnableBinding(Sink.class)
public class MessageConsumerController {
@Value("${server.port}")
private String serverPort ;
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("消费者1号,接收到的消息:"+message.getPayload()+"\t port:"+serverPort);
}
}
- 主启动类
package com.edu.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamConsumer8802_APP {
public static void main(String[] args) {
SpringApplication.run(StreamConsumer8802_APP.class,args) ;
}
}
消息的重复消费问题:
注意:不同分组的消费方,都可以接收到消息,同一个分组的消费方,通过竞争的方式来进行消息的接收,只能有一个消费方能够接收到消息。
消息的持久化的问题:
测试环境的搭建:
- 把上述的消费方 clone 一份 ,端口改为 8803,
- 停止8802和8803两个消息消费方,8802去掉 group 分组,8803保留group分组
- 消息发送方发送四条消息,
- 先启动8802,因为8802去掉了group分组,所以没有接收到消息
- 启动8803,没有去掉grup分组,发现它接收到了 四条消息
总结:
group 分组属性,在消息重复消费,消息持久话消费,和避免消息丢失上是一个非常重要的属性。