1.配置文件
spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.published-returns=true
spring.mvc.pathmatch.matching-strategy=ant_path_matcher
2.配置类
package com.mq.rabbit.config;
import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 发布确认配置类
*/
@Configuration
public class ConfirmConfig {
// 交换机
private static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
// 队列
private static final String CONFIRM_QUEUE_NAME = "confirm.queue";
// routingKey
private static final String CONFIRM_ROUTING_KEY = "key1";
// 声明交换机
@Bean("confirmExchange")
public DirectExchange confirmExchange() {
return new DirectExchange(CONFIRM_EXCHANGE_NAME);
}
// 声明队列
@Bean("confirmQueue")
public Queue confirmQueue() {
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
// 队列绑定交换机
@Bean
public Binding queueBindingExchange(
@Qualifier("confirmQueue") Queue confirmQueue,
@Qualifier("confirmExchange") DirectExchange confirmExchange
) {
return BindingBuilder.bind(confirmQueue).to(confirmExchange).with(CONFIRM_ROUTING_KEY);
}
}
package com.mq.rabbit.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.ReturnedMessage;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import javax.annotation.PostConstruct;
@Slf4j
public class MyCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback{
@Autowired
private RabbitTemplate rabbitTemplate;
// 注入
@PostConstruct
public void init() {
rabbitTemplate.setConfirmCallback(this);
rabbitTemplate.setReturnsCallback(this);
}
// 交换机确认回调方法
@Override
public void confirm(CorrelationData correlationData, boolean ack, String reason) {
String id = correlationData != null ? correlationData.getId() : "";
if (ack) {
log.info("交换机收到ID为{}的消息", id);
} else {
log.info("交换机还未收到ID为{}的消息,由于原因{}", id, reason);
}
}
@Override
public void returnedMessage(ReturnedMessage returnedMessage) {
log.error("消息{},被交换机{}退回");
}
}
3.生产者
package com.mq.rabbit.controller;
import com.mq.rabbit.config.ConfirmConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* 发布确认
*/
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ConfirmProducerController {
@Autowired
RabbitTemplate rabbitTemplate;
@GetMapping("/sendMessage/{message}")
public void sendMessage(@PathVariable String message) {
CorrelationData correlationData = new CorrelationData("1");
rabbitTemplate.convertAndSend("confirm.exchange", "key1", message, correlationData);
log.info("发送消息为{}", message);
}
}
4.消费者
package com.mq.rabbit.consumer;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* 接收消息
*/
@Component
@Slf4j
public class ConfirmConsumer {
@RabbitListener(queues = "confirm.queue")
public void receiveMessage(Message message) {
String msg = new String(message.getBody());
System.out.println(msg);
log.info("接收到confirm.queue队列的消息{}", msg);
}
}