0
点赞
收藏
分享

微信扫一扫

SpringCloud Stream基本使用

介绍

主要应用场景

可能我们会遇到不同的系统在用不同的消息队列,比如系统A用的Kafka、系统B用的RabbitMQ,但是我们现在又没有学习过Kafka,那么怎么办呢?有没有一种方式像JDBC一样,我们只需要关心SQL和业务本身,而不用关心数据库的具体实现呢?

SpringCloud Stream能够做到,它能够屏蔽底层实现,我们使用统一的消息队列操作方式就能操作多种不同类型的消息队列。

img

它屏蔽了不同消息队列底层操作,让我们使用统一的Input和Output形式,以Binder为中间件,这样就算我们切换了不同的消息队列,也无需修改代码,而具体某种消息队列的底层实现是交给Stream在做的。

实践

Linux安装RabbitMQ

接下来创建一个springboot父子项目来演示一下其基本使用(以rabbitmq为例):

image-20220928141522794

父工程依赖:

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-dependencies</artifactId>
    <version>2021.0.1</version>
    <type>pom</type>
    <scope>import</scope>
</dependency>

image-20220928142106277

子模块依赖:

<dependencies>
    <!--  RabbitMQ的Stream实现  -->
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
</dependencies>

image-20220928142150258

生产者

生产者配置文件:

server:
  port: 8801

spring:
  application:
    name: stream-publisher
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合(随意)
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 47.96.156.51  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道channel的名称
          destination: studyExchange # 表示要使用的Exchange名称
          content-type: text/plain  # 设置消息类型,application/json -> json格式,本文要设置为 text/plain -> 文本类型

定义发送消息接口:

public interface IMessagePublisher {
    void publish(String message);
}

接口实现类:

@EnableBinding(Source.class)    // 定义消息的推送管道(Source是spring的)
@Slf4j
public class MessagePublishImpl implements IMessagePublisher {

    @Resource
    private MessageChannel output;  // 消息发送管道

    @Override
    public void publish(String message) {

        log.info("发送消息:{}", message);
        // MessageBuilder是spring的integration.support.MessageBuilder
        output.send(MessageBuilder.withPayload(message).build());

    }

}

Source接口

image-20220930212128357

新建controller.PublishController

@RestController
public class PublishController {

    @Resource
    IMessagePublisher publisher;

    @RequestMapping("/publish")
    public String publish(String message) {
        publisher.publish(message);
        return "消息发送成功!" + new Date();
    }
}

消费者

配置文件:

server:
  port: 8802

spring:
  application:
    name: stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合(随意)
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 47.96.156.51  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道channel的名称
          destination: studyExchange # 表示要使用的Exchange名称
          content-type: text/plain  # 设置消息类型,application/json -> json格式,本文要设置为 text/plain -> 文本类型

监听消息:

@EnableBinding(Sink.class) // (Sink也是spring的)
public class ReceiveMessageListener {

    @StreamListener(Sink.INPUT) // 监听
    public void input(Message<String> message) {
        System.out.println("消费者1号------>收到的消息:" + message.getPayload());
    }
}

Sink接口

image-20220929165116435

注意: output输入信道是stream自带的,还自带了一个输出信道input,上述两个接口。

启动两个项目进行测试,项目启动完成发现studyExchange交换机已经创建好了。

image-20220929165628415

调用发送消息接口,消息发送成功

image-20220929165806075

消费者也成功消费消息

image-20220929165902093

使用自定义信道实现消息传递

上述代码实现是通过stream默认的信道完成的,本部分实现通过自定义信道实现

类比stream默认信道,创建两个自定义信道MySource、MySink

public interface MySource {
    /**
     * Name of the output channel.
     */
    String OUTPUT1 = "output1";

    /**
     * @return output channel
     */
    @Output(OUTPUT1)
    MessageChannel output();
}
public interface MySink {
    /**
     * Input channel name.
     */
    String INPUT1 = "input1";

    /**
     * @return input channel.
     */
    @Input(INPUT1)
    SubscribableChannel input();
}

生产者新增配置项:

image-20220930213141638

同理消费者新增配置项:

image-20220930213310450

接下来改造发送消息实现类

image-20220930213855720

修改该类,EnableBinding注解的值改为绑定多个传入信道接口,然后使用我们自定义信道发送消息。

消费者消费消息

image-20220930214025905

同理,EnableBinding注解的值改为绑定多个信道接口,新建一个方法监听即可。

启动项目,进行测试:

image-20220930214151104

发现我们自定义信道消息也能正常被消费到。

image-20220930214253006

@EnableBinding注解过时

由上图可以看到 @EnableBinding 注解貌似已经过时了。

@EnableBinding源码中明确声明,该注解在从3.1版本开始被弃用,推荐我们使用函数编程的方式

接下来会演示下这种方式基本使用:

生产者案例:

yml配置:

server:
  port: 8801

spring:
  application:
    name: stream-publisher
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合(随意)
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 47.96.156.51  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: # 服务的整合处理
        myChannel-out-0:
          destination: demo #表示要使用Exchange名称定义
          contentType: text/plain

注意使用这种方式,bingdings 集合中的key由 通道名-out/in-数字组成

新版发送消息:

@RestController
public class PublishController {

    @Resource
    StreamBridge bridge;

    @RequestMapping("/publish")
    public String publish(String message) {
        bridge.send("myChannel-out-0", message);
        return "消息发送成功!" + new Date();
    }
}

@Autowire注解自动注入StreamBridge的实例,直接使用StreamBridge发送消息,StreamBridge的send方法第一个参数是binding的名字,第二个参数是想要发送的消息。

消费者案例:

yml配置:

server:
  port: 8802

spring:
  application:
    name: stream-consumer
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 表示定义的名称,用于binding整合(随意)
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关环境配置
            spring:
              rabbitmq:
                host: 47.96.156.51  # RabbitMQ在本机的用localhost,在服务器的用服务器的ip地址
                port: 5672
                username: admin
                password: admin
                virtual-host: /
      bindings: # 服务的整合处理
        myChannel-in-0:
          destination: demo
          contentType: text/plain

消费者消费消息:

@Component
public class ConsumerComponent {

    @Bean("myChannel")  
    public Consumer<String> consumer() {
        return message -> System.out.println("新版本消费消息:" + message);
    }
    
    //@Bean
    //public Consumer<String> myChannel() {
    //    return message -> System.out.println("新版本消费消息:" + message);
    //}
}

注意:@Bean里是yml配置文件中通道名称,这样生产者发送的数据才会正确到达,应用程序启动后会自动接收生产者发送的消息;

或者是方法名为yml配置文件中通道名称,两种方式都能正常消费消息。

启动项目进行测试:

交换机正常创建:

image-20220930223918155

消息发送成功:

image-20220930223539563

消息正常被消费:

image-20220930223617667

完毕!

参考博客:

SpringCloud Stream @EnableBinding注解过时

举报

相关推荐

0 条评论