题目
RocketMQ如何保证消息不丢失?请从生产者到消费者全链路说明
信息
- 类型:问答
- 难度:⭐⭐
考点
消息可靠性机制,生产者发送策略,Broker存储机制,消费者消费逻辑
快速回答
保证RocketMQ消息不丢失需要全链路控制:
- 生产者端:使用同步发送+重试机制,设置
retryTimesWhenSendFailed,捕获发送异常 - Broker端:配置同步刷盘(flushDiskType=SYNC_FLUSH)和主从同步(SYNC_MASTER)
- 消费者端:业务处理完成后再手动ACK,关闭自动提交(enableAutoCommit=false)
- 容灾设计:部署多副本集群,启用DLQ(死信队列)处理重试失败消息
1. 消息丢失风险点
RocketMQ消息传递全链路中的风险环节:
- 生产者发送失败:网络抖动、Broker宕机
- Broker存储丢失:异步刷盘时宕机、主从同步延迟
- 消费者处理失败:自动提交offset后业务异常
2. 生产者端保障
核心机制:同步发送 + 重试策略
// 示例:可靠生产者配置
DefaultMQProducer producer = new DefaultMQProducer("PID_ORDER");
producer.setNamesrvAddr("127.0.0.1:9876");
// 关键配置
producer.setRetryTimesWhenSendFailed(3); // 同步发送重试次数
producer.setSendMsgTimeout(5000); // 超时时间
try {
Message msg = new Message("OrderTopic", "订单创建".getBytes());
SendResult result = producer.send(msg, 1000); // 同步发送带超时
if (result.getSendStatus() != SendStatus.SEND_OK) {
// 记录发送失败日志并告警
}
} catch (Exception e) {
// 重试后仍失败,持久化到DB等待补偿
}最佳实践:
- 事务消息用于严格一致性场景(如支付)
- 生产环境必须捕获
RemotingException/MQClientException
3. Broker端保障
存储机制:
- 刷盘方式:
- SYNC_FLUSH(同步刷盘):写入PageCache后立即刷盘,性能下降但数据安全
- ASYNC_FLUSH(异步刷盘):默认方式,依赖OS刷盘机制
- 主从复制:
- SYNC_MASTER:从节点写入成功才返回ACK
- ASYNC_MASTER:默认方式,存在数据丢失风险
配置示例(broker.conf):
# 同步刷盘配置
flushDiskType = SYNC_FLUSH
# 主从同步模式
brokerRole = SYNC_MASTER
# 消息存储周期(默认72小时)
fileReservedTime = 72容灾设计:
- 至少部署1主2从跨机架集群
- 启用Dledger模式(基于Raft协议)实现自动选主
4. 消费者端保障
消费逻辑要点:
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
for (MessageExt msg : msgs) {
try {
// 1. 业务处理(需幂等设计)
processOrder(msg);
// 2. 成功后再ACK
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 3. 失败消息重试(默认重试16次)
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
// 关闭自动提交(默认关闭)
consumer.setEnableAutoCommit(false);关键配置:
suspendCurrentQueueTimeMillis:控制重试间隔maxReconsumeTimes:最大重试次数(超过则投递到DLQ)
5. 监控与应急
- 监控指标:
- 生产者:发送失败率、RT时间
- Broker:PageCache未刷盘消息量、Slave落后条数
- 消费者:积压消息数、重试队列大小
- 应急措施:
- 通过
mqadmin命令查询消息轨迹 - 从DLQ中恢复业务关键消息
- 通过
常见错误
- 误用异步发送且未处理回调
- 消费者捕获异常后仍返回
CONSUME_SUCCESS - Broker磁盘写满导致服务不可用
扩展知识
- 事务消息:二阶段提交实现分布式事务
- 消息轨迹:通过
traceTopicEnable=true开启全链路追踪 - 顺序消息:同一ShardingKey的消息使用MessageQueueSelector