RabbitMQ服务器因不明原因重启,导致生产者消息投递失败,如何进行可靠投递?
生产者发消息到交换机,交换机/队列不存在或者无法接受消息,要对缓存进行处理
对发送的消息进行备份,用定时任务对未成功的消息进行重新投递
1)SpringBoot版本
配置文件:application.properties
spring.rabbitmq.publisher-confirm-type=correlated添加配置类
import org.springframework.amqp.core.Binding
import org.springframework.amqp.core.BindingBuilder
import org.springframework.amqp.core.DirectExchange
import org.springframework.amqp.core.Queue
import org.springframework.amqp.core.QueueBuilder
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
@Configuration
class ConfirmConfig {
    companion object{
        const val CONFIRM_EXCHANGE_NAME = "confirm_exchange"     // 交换机
        const val CONFIRM_QUEUE_NAME = "confirm_queue"           // 队列
        const val CONFIRM_ROUTING_KEY = "key1"                   // RoutingKey
    }
    // 交换机声明
    @Bean("confirmExchange")
    fun confirmExchange(): DirectExchange = DirectExchange(CONFIRM_EXCHANGE_NAME)
    @Bean("confirmQueue")
    fun confirmQueue(): Queue = QueueBuilder.durable(CONFIRM_QUEUE_NAME).build()
    @Bean
    fun queueBindingExchange(@Qualifier("confirmQueue") queue: Queue, @Qualifier("confirmExchange") exchange: DirectExchange): Binding{
        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
    }
}生产者
@RestController
@RequestMapping("confirm")
class ProducerController {
    @Autowired
    private lateinit var rabbitTemplate: RabbitTemplate
    @GetMapping("sendMsg/{message}")
    fun sendMessage(@PathVariable message: String){
        val correlationData = CorrelationData("1")
        rabbitTemplate.convertAndSend(
            ConfirmConfig.CONFIRM_EXCHANGE_NAME,
            ConfirmConfig.CONFIRM_ROUTING_KEY,
            message, correlationData
        )
        println("发消息内容为:$message")
    }
}消费者
import cn.soldat.springbootrabbitmq.config.ConfirmConfig
import org.springframework.amqp.core.Message
import org.springframework.amqp.rabbit.annotation.RabbitListener
import org.springframework.stereotype.Component
@Component
class Consumer {
    @RabbitListener(queues = [ConfirmConfig.CONFIRM_QUEUE_NAME])
    fun receiveConfirmMessage(message: Message){
        println("接受到的队列confirm.queue消息:${String(message.body)}")
    }
}回退消息
在仅开启了生产者确认机制情况下,交换机接收到消息后,会直接给生产者发送确认消息,如果发现该消息不可路由,则会丢弃消息,不会通知生产者。通过设置mandatory参数,可以在当消息传递过程中不可达目的地时将消息返回给生产者。
配置文件
spring.rabbitmq.publisher-returns=true回调接口
@Component
class MyCallBack: RabbitTemplate.ReturnsCallback {
    @Autowired
    private lateinit var rabbitTemplate: RabbitTemplate
    // 注入到 RabbitTemplate
    @PostConstruct
    fun init() {
        rabbitTemplate.setReturnsCallback(this)
    }
    override fun returnedMessage(returned: ReturnedMessage) {
        println("消息:${returned.message} 被交换机:${returned.exchange} 退回,退回原因:${returned} 路由Key:${returned.routingKey}")
    }
}备份交换机
无法投递的消息将发送给备份交换机,可以设置报警队列,用独立的消费者进行监测和报警。
备份交换机的优先级高于消息回退
配置类添加备份交换机、备份队列、报警队列
@Configuration
class ConfirmConfig {
    companion object{
        const val CONFIRM_EXCHANGE_NAME = "confirm_exchange"     // 交换机
        const val CONFIRM_QUEUE_NAME = "confirm_queue"           // 队列
        const val CONFIRM_ROUTING_KEY = "key1"                   // RoutingKey
        const val BACKUP_EXCHANGE = "backup_exchange"   // 备份交换机
        const val BACKUP_QUEUE_NAME = "backup_queue"    // 备份队列
        const val WARNING_QUEUE_NAME = "warning_queue"  // 报警队列
    }
    // 交换机声明
    @Bean("confirmExchange")
    fun confirmExchange(): DirectExchange{
        return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE_NAME).withArgument("alternate-exchange", BACKUP_EXCHANGE).build()
    }
    @Bean("confirmQueue")
    fun confirmQueue(): Queue = QueueBuilder.durable(CONFIRM_QUEUE_NAME).build()
    @Bean
    fun queueBindingExchange(@Qualifier("confirmQueue") queue: Queue, @Qualifier("confirmExchange") exchange: DirectExchange): Binding{
        return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY);
    }
    @Bean("backupExchange")
    fun backupExchange(): FanoutExchange = FanoutExchange(BACKUP_EXCHANGE)
    @Bean("backupQueue")
    fun backupQueue(): Queue = Queue(BACKUP_QUEUE_NAME)
    @Bean("warningQueue")
    fun warningQueue(): Queue = Queue(WARNING_QUEUE_NAME)
    @Bean
    fun backupQueueBindingBackupExchange(@Qualifier("backupQueue") queue: Queue, @Qualifier("backupExchange") exchange: FanoutExchange): Binding{
        return BindingBuilder.bind(queue).to(exchange)
    }
    @Bean
    fun warningQueueBindingBackupExchange(@Qualifier("warningQueue") queue: Queue, @Qualifier("backupExchange") exchange: FanoutExchange): Binding{
        return BindingBuilder.bind(queue).to(exchange)
    }
}消费者
@Component
class WarningConsumer{
    @RabbitListener(queues = [ConfirmConfig.WARNING_QUEUE_NAME])
    fun receiveConfirmMessage(message: Message){
        val msg = String(message.body)
        println("报警发现不可路由消息:$msg")
    }
}









