rabbitmq导致丢失数据—线上问题
数据的丢失问题,可能出现在生产者、MQ、消费者中,咱们从 RabbitMQ来分析一下吧。
1.如何保证生产者消息不丢失
以mq实现异步消峰为例,系统需要实现发送手机验证码,采用rabbitmq发送消息给手机服务,消费消息后发送短信
-
生产者
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = “sms.sms.queue”, durable = “true”,
arguments = @Argument(name = “x-message-ttl”,value = “3000”,type = “java.lang.Integer”)),
exchange = @Exchange(value = “mall.sms.exchange”,
ignoreDeclarationExceptions = “true”),
key = {“sms.verify.code”}))
public void listenSms(Map<String, String> msg, Message message, Channel channel) throws Exception {
if (msg == null || msg.size() <= 0) {
// 放弃处理
return;
}
// 采用手动应答模式, 手动确认应答更为安全稳定
String phone = msg.get(“phone”);
String code = msg.get(“code”);if (StringUtils.isBlank(phone) || StringUtils.isBlank(code)) { // 放弃处理 return; } LOGGER.info("消息服务 正在给手机号为{}发送短信,code: {}", phone, code); // 第一个参数是消息标识, 第二个是批量确认; false当前消息确认, true此次之前的消息确认 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); }
2.Rabbitmq如何保证消息可靠性
//给延迟队列发送消息
amqpTemplate.convertAndSend(QueueEnum.QUEUE_TTL_ORDER_CANCEL.getExchange(), QueueEnum.QUEUE_TTL_ORDER_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
//给消息设置延迟毫秒值
message.getMessageProperties().setExpiration(String.valueOf(delayTimes));
//给消息设置持久化,防止rabbitmq宕机导致消息丢失
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
}
});
3.消费者端如何保证消息消费
rabbitmq:
host: localhost
port: 5672
virtual-host: /mall
username: mall
password: mall
publisher-confirms: true #如果对异步消息需要回调必须设置为true
# 发布返回
publisher-returns : true
## 消费端配置
# 手动ack
listener.simple.acknowledge-mode= : manual