1. 生产者不丢失数据
1.1 开启事务模式
RabbitTemplate代码:
@Bean
@Autowired
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
return rabbitTemplate;
}
(1)生产者发送消息,若是MQ服务器挂掉,那么程序会不断尝试重试,直至broker
恢复,会重新接收这个消息。
(2)若是消费者在ack之前挂掉,MQ服务器会将这条消息恢复,若是长时间没有收到consumer
端的确认消息,那么会将消息从unacked
状态转化为ready
状态。
(3)若是消费者处理消息期间抛出异常,MQ服务器会收到一个nack
或者reject
,MQ服务器也会恢复这条消息。
开启事务会大幅降低消息发送及接收效率,因为当已经有一个事务存在时,后面的消息是不能被发送或者接收(对同一个consumer而言)的。
1.2 confirm模式
为了producer
端知道消息是否进入queue
,可以使用confirm
和return
来代替事务。
confirm和return的配置:
//消息的确认机制(confirm);
connectionFactory.setPublisherConfirms(true);
connectionFactory.setPublisherReturns(true);
confirm和return的区别和联系:
- confirmCallBack:消息从生产者到达
exchange
时返回ack
,消息未到达exchange
返回nack
; - returnCallBack:消息进入
exchange
但未进入queue
时会被调用。
官网对confirm和return的描述:
大概意思就是:在rabbit Template
发送操作完成时,channels
才会关闭。在连接工厂(ConnectionCache
)满的情况下,缓存中有空间时,channel
不会关闭,直到confirm/return
处理完成。
使用confirm
时,将在接受到ack
时关闭channels
;使用return
时,通道会整整持续5s的时间。通常建议将Connection Cache
设置足够大的值,以便发布的消息的channel
可以返回连接池而不是关闭。当看到channel
快速打开或者关闭时,应该考虑增加连接池大小以减少服务器的开销。
1.2.1 生产者消息丢失的情况
(1)rabbitmq由于短暂的网络异常,导致消息发送了出去,但是未到exchange,连接可以短时间恢复。
(2)rabbitmq服务器挂掉且长时间无法恢复,消息无法发送。
1.2.2 生产者消息发送失败(网络异常)
当消息发送失败,一般两种方式处理这个消息:
- 自动重发;
- 系统预警人工处理;
配置文件源码:
rabbitTemplate.setConfirmCallback(new ConfirmCallback() {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
log.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
}
});
rabbitTemplate.setReturnCallback(new ReturnCallback() {
@Override
public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
log.info("消息丢失:exchange({}),route({}),replyCode({}),replyText({}),message:{}", exchange, routingKey, replyCode, replyText, message);
}
});
我们可以看到,在ReturnCallback
中,返回的参数是Message
对象,我们可以获取消息内容
,exchange
,routingKey
这些信息的。
但是在ConfirmCallback
中,确是没有消息信息,只有一个correlationData[ˌkɒrəˈleɪʃn] 相关性的
,并且我们看到他的日志,打印出来还是null。
输出日志:
2019-03-10-00-52 [AMQP Connection 127.0.0.1:5672] [com.Configuration.MyAMQPConfig]
[INFO] - 消息发送成功:correlationData(null),ack(false),cause
(channel error; protocol method: #method<channel.close>(reply-code=404, reply-text=NOT_FOUND - no
exchange 'exchange_direct_no' in vhost '/', class-id=60, method-id=40))
于是我们打开correlationData
的源码:
可以看到里面的属性只有一个id。
public class CorrelationData implements Correlation {
private volatile String id;
public CorrelationData() {
}
public CorrelationData(String id) {
this.id = id;
}
public String getId() {
return this.id;
}
public void setId(String id) {
this.id = id;
}
public String toString() {
return "CorrelationData [id=" + this.id + "]";
}
}
于是我们打开RabbitTemplate send/convertAndSend
方法的源码:
发现里面含有CorrelationData对象,很显然我们在发送消息的时候,消息信息和correnlationData.id
属性进行了绑定,我们若是可以根据id拿到消息,那么就可以进行“重试”或者“预警”等操作了。
于是我们扩展correlationData
类,将id和消息属性绑定起来。
public class CorrelationData extends
org.springframework.amqp.rabbit.support.CorrelationData {
//消息体
private volatile Object message;
//交换机
private String exchange;
//路由键
private String routingKey;
//重试次数
private int retryCount = 0;
我们在发送消息的时候,可以发送correlationData
扩展对象,在我们confirm
的ack=false
的情况下,于是我们就可以拿到消息主体了。
@Test
public void contextLoads() {
Map<String, Object> map = new HashMap<>();
Book book = new Book("西游记", "120.00");
//使用继承扩展的CorrelationData 、id消息流水号
CorrelationData correlationData =
new CorrelationData(UUID.randomUUID().toString());
correlationData.setMessage(book);
correlationData.setExchange("exchange_direct_no");
correlationData.setRoutingKey("ord");
try {
rabbitTemplate.convertAndSend("exchange_direct_no", "ord", book, correlationData);
} catch (AmqpConnectException e) {
System.out.println("保存信息编号:" + correlationData);
}
}
现在我们可以拿到消息主体,也可以拿到rabbitTemplate,那么我们是否可以在confirm回调方法中再次重试?
ConfirmCallback回调函数:
public class MessageConfirmCallback implements RabbitTemplate.ConfirmCallback {
private static final Logger logger = LoggerFactory.getLogger(MessageConfirmCallback.class);
private RabbitTemplate rabbitTemplate;
public MessageConfirmCallback(RabbitTemplate rabbitTemplate) {
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
if (ack == true) {
logger.info("消息发送成功:correlationData({}),ack({}),cause({})", correlationData, ack, cause);
} else {
if (correlationData instanceof com.Configuration.CorrelationData) {
com.Configuration.CorrelationData messageCorrelationData = (com.Configuration.CorrelationData) correlationData;
String exchange = messageCorrelationData.getExchange();
Object message = messageCorrelationData.getMessage();
String routingKey = messageCorrelationData.getRoutingKey();
int retryCount = messageCorrelationData.getRetryCount();
//重试次数+1
((com.Configuration.CorrelationData) correlationData).setRetryCount(retryCount + 1);
rabbitTemplate.convertSendAndReceive(exchange, routingKey, message, correlationData);
}
}
}
}
注意事项:
但是在主线程发送消息的过程中,rabbitMQf服务器关闭,这时候主程序和ConfirmCallback
线程都会等待Connection恢复,然后重新启动rabbitmq
,当应用程序重新建立connection
之后,两个线程都会死锁。
解决方案:
当ack=fasle
的情况下,可以将消息存到缓存中,定时发起任务重发。
1.2.2 生产者消息发送失败(MQ服务器异常)
对于这种情况,confirm是没有发送出去的,但是消息丢失怎么处理,但是会抛出AmqpConnectException
异常,我们可以捕获该异常,然后将msgId
也就是CorrelationData
对象保存即可。
@Test
public void contextLoads() {
Map<String, Object> map = new HashMap<>();
Book book = new Book("西游记", "120.00");
CorrelationData msgId=new CorrelationData();
try {
rabbitTemplate.convertAndSend("exchange_direct_no", "ord", book,msgId);
}catch (AmqpConnectException e){
System.out.println("保存信息编号:"+msgId);
}
推荐阅读:
https://www.jianshu.com/p/9aec19a910b1
https://blog.csdn.net/qq315737546/article/details/66475103
https://www.colabug.com/2325507.html
https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases