0
点赞
收藏
分享

微信扫一扫

RabbitMQ:@RabbitListener注解简化消息监听


​pom.xml​​:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.6.2</version>
</parent>

<packaging>jar</packaging>

<groupId>com.kaven</groupId>
<artifactId>springboot</artifactId>
<version>0.0.1-SNAPSHOT</version>

<name>springboot</name>
<description>springboot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>

​application.yml​​:

spring:
rabbitmq:
addresses: 192.168.1.9:5672
username: admin
password: admin

​User​​类(消息负载的实体类):

package com.kaven.springboot.rabbitmq;

import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

@Setter
@Getter
@ToString
@AllArgsConstructor
public class User {
private String username;
private String password;
private String code;
}

​Json2UserMessageConverter​​​类(消息转换器,将​​json​​​数据转换成​​User​​​对象,​​json​​​数据由消息体的​​byte[]​​生成):

package com.kaven.springboot.rabbitmq;

import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.support.converter.MessageConversionException;
import org.springframework.amqp.support.converter.MessageConverter;
import org.springframework.stereotype.Component;

import java.nio.charset.StandardCharsets;

@Component("json2UserMessageConverter")
public class Json2UserMessageConverter implements MessageConverter {

private static final Gson GSON = new Gson();

@Override
public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
return new Message(GSON.toJson(object).getBytes(StandardCharsets.UTF_8));
}

@Override
public Object fromMessage(Message message) throws MessageConversionException {
return GSON.fromJson(new String(message.getBody(), StandardCharsets.UTF_8), User.class);
}
}

​Consumer​​​类(消息监听,使用​​@RabbitListener​​注解简化消息监听):

package com.kaven.springboot.rabbitmq;

import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;

@Component
public class Consumer {

@RabbitListener(
bindings = {
@QueueBinding(
value = @Queue("queue.user"),
exchange = @Exchange(value = "exchange.user", type = ExchangeTypes.TOPIC),
key = {"*.user"}
)
},
messageConverter = "json2UserMessageConverter"

)
public void process(User user) {
System.out.println("Consumer - process 接收消息: " + user);
}
}

​Producer​​类(用于发布消息):

package com.kaven.springboot.rabbitmq;

import com.google.gson.Gson;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.UUID;

@Component
public class Producer {

private static final Gson GSON = new Gson();

@Resource
private final RabbitTemplate rabbitTemplate;

public Producer(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}

public void sendMsg(User user) {
CorrelationData correlationId = new CorrelationData(UUID.randomUUID().toString());
MessageProperties messageProperties = new MessageProperties();
messageProperties.setExpiration("10000");
Message message = new Message(GSON.toJson(user).getBytes(StandardCharsets.UTF_8), messageProperties);
rabbitTemplate.send("exchange.user", "new.user", message, correlationId);
}
}

​ProducerController​​类(用于发布消息的接口):

package com.kaven.springboot.rabbitmq;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class ProducerController {

@Resource
private Producer producer;

@GetMapping("/send")
public String send(User user) {
producer.sendMsg(user);
return "数据发送成功";
}
}

启动类:

package com.kaven.springboot;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class SpringbootApplication {
public static void main(String[] args) {
SpringApplication application = new SpringApplication(SpringbootApplication.class);
application.run(args);
}
}

启动应用,使用​​Postman​​请求接口。

RabbitMQ:@RabbitListener注解简化消息监听_spring


控制台输出:

Consumer - process 接收消息: User(username=kaven, password=itkaven, code=908899)

效果符合预期,使用​​@RabbitListener​​​注解简化了消息监听,不需要自己定义交换机、队列以及绑定关系等​​bean​​​,将这些需要的​​bean​​​全部交给​​Spring Boot​​来管理。博客就介绍到这里,如果博主有说错的地方或者大家有不同的见解,欢迎大家评论补充。


举报

相关推荐

0 条评论