0
点赞
收藏
分享

微信扫一扫

【自用RabbitMq】生产及发送消息

Mhhao 2022-03-12 阅读 32

队列配置

import org.springframework.amqp.core.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.stereotype.Component;

@Configuration
@Component
public class xxxxxRabbitMqConfig {

    // 交换机名
    @Value("${xxxxxxxxxx}")
    private String 交换机名;

    // 队列名
    @Value("${xxxxxxxxxx}")
    private String 队列名;

    // Routingkey
    @Value("${xxxxxxxx}")
    private String Routingkey;

    // 声明 死信队列交换机
    @Bean("exchange")(可以自定义名字)
    public DirectExchange exchange() {
        return new DirectExchange(channelOrgChangeExchange);
    }

    // 声明队列
    @Bean("queue") (可以自定义名字)
    public Queue queue() {     // 导包: org.springframework.amqp.core
        return new Queue(channelOrgChangeQueue, true);
        // return QueueBuilder.durable(channelOrgChangeQueue).withArguments(null).build();
    }

    // 绑定交换机和队列
    @Bean(绑定需要和前面定义名字关联)
    public Binding queuebBindingX(@Qualifier("queue") Queue queue,
                                  @Qualifier("exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(channelOrgChangeKey);
    }
}

消息发送

import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.support.CorrelationData;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.messaging.handler.annotation.Headers;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import com.alibaba.fastjson.JSON;

--------------------------------------------------------------	
 	@Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private RedisTemplate redisTemplate;
---------------------------------------------------------------
	//可以自定义发送消息实体 赋值转为json字符串
	String sendMsg = JSON.toJSONString("发送消息实体");
	
	CorrelationData correlationData = new CorrelationData();
                // 绑定消息发送确认回调方法
                rabbitTemplate.setConfirmCallback(confirmCallback);
                // 发送消息之前将消息存入 redis中 k消息id v发送消息
                archiveMsg(correlationData.getId(), sendMsg);
                rabbitTemplate.convertAndSend(交换机名字, routingkey, sendMsg, correlationData);
                log.info("mq发送完成:消息[{}]", sendMsg);
/**
     * 交换机不管是否收到消息的一个回调方法
     *
     * @param correlationData 消息相关数据
     * @param ack             交换机是否收到消息, true(ack) false(nack)
     * @param cause           为收到消息的原因: 异常信息
     */
    private final RabbitTemplate.ConfirmCallback confirmCallback = (CorrelationData correlationData, boolean ack, String cause) -> {
        // mQ 发消息唯一id
        String id = correlationData != null ? correlationData.getId() : "";
        if (ack) {
            // 发送成功
            if (!StringUtils.isEmpty(id)) {
                redisTemplate.delete(id);
            }
        } else {
            // 发送失败
            repeatMsg(id);
        }
    };
// 发送消息存储Redis
    private void archiveMsg(String id, String msg) {
        if (com.alibaba.druid.util.StringUtils.isEmpty(id)) {
            log.error("rabbitmq收到未知的空消息!");
            return;
        }
        log.info("消息存档,消息Id[{}]", id);
        redisTemplate.opsForSet().add(id, msg);
    }


    // 消息发送失败重新发送
    private void repeatMsg(String id) {
        if (com.alibaba.druid.util.StringUtils.isEmpty(id)) {
            log.error("rabbitmq收到未知的空消息!");
            return;
        }
        // redis中获取到失败的消息
        String msg = redisTemplate.opsForValue().get(id).toString();
        CorrelationData correlationData = new CorrelationData();
        correlationData.setId(id);
        // 绑定消息发送确认回调方法
        rabbitTemplate.setConfirmCallback(confirmCallback);
        // 重新发送消息
        rabbitTemplate.convertAndSend(交换机, routingkey, msg);
    }
举报

相关推荐

0 条评论