Spring-boot集成RabbitMq
- 导入依赖
- 创建配置类(以发布订阅模式做示例)
- 创建消息消费者
- 创建消息生成者
导入依赖
<!-- rabbitmq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
创建配置类(以发布订阅模式做示例)
package com.ddz.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* @program: my-rabbtimq
* @description 订阅模式
* @author: ddz
* @create: 2021-06-06 16:59
**/
@Configuration
public class FanoutConfig {
/**
* 订阅模式
*/
public static final String FANOUT_QUEUE = "fanout.queue";
public static final String FANOUT_QUEUE1 = "fanout.queue1";
public static final String FANOUT_EXCHANGE = "fanout.exchange";
@Bean
public Queue fanoutQueue() {
return QueueBuilder.durable(FANOUT_QUEUE).build();
}
@Bean
public Queue fanoutQueue1() {
return QueueBuilder.durable(FANOUT_QUEUE1).build();
}
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange(FANOUT_EXCHANGE);
}
@Bean
public Binding bindingQueueToExchange() {
return BindingBuilder.bind(fanoutQueue()).to(fanoutExchange());
}
@Bean
public Binding bindingQueueToExchange1() {
return BindingBuilder.bind(fanoutQueue1()).to(fanoutExchange());
}
}
创建消息消费者
就是接收mq推送消息的一方
package com.ddz.listener;
import com.ddz.config.FanoutConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @program: my-rabbtimq
* @description mq监听
* @author: ddz
* @create: 2021-06-06 17:10
**/
@Slf4j
@Component
public class MqListener {
@RabbitListener(queues = FanoutConfig.FANOUT_QUEUE)
public void fanout(String message) {
log.info("fanout接收消息:{}", message);
}
@RabbitListener(queues = FanoutConfig.FANOUT_QUEUE1)
public void fanout1(String message) {
log.info("fanout1接收消息:{}", message);
}
}
创建消息生成者
把消息发送给mq的一方
package com.ddz.controller;
import com.ddz.config.FanoutConfig;
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageProperties;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
import java.io.UnsupportedEncodingException;
/**
* @program: my-rabbtimq
* @description
* @author: ddz
* @create: 2021-06-06 17:16
**/
@Api(tags = "RabbitMq")
@RestController
public class MqController {
@Resource
private RabbitTemplate rabbitTemplate;
@ApiOperation("订阅模式")
@GetMapping("fanout")
public void fanout(String msg) throws UnsupportedEncodingException {
//设置部分请求参数
MessageProperties messageProperties = new MessageProperties();
//设置消息类型
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN);
// 创建消息对象,并把消息放入对象里面
Message message = new Message(msg.getBytes("UTF-8"), messageProperties);
// 实现发送消息到mq
rabbitTemplate.convertAndSend(FanoutConfig.FANOUT_EXCHANGE, "", message);
}
}