题目
RocketMQ 如何保证消息不丢失?请设计一个高可靠的消息发送方案
信息
- 类型:问答
- 难度:⭐⭐
考点
消息可靠性机制,高可用架构设计,事务消息实现
快速回答
保证消息不丢失的核心方案:
- 生产者端:启用同步发送 + 事务消息 + 重试机制
- Broker端:同步刷盘 + 主从同步复制
- 消费者端:手动ACK + 消费重试机制
- 系统设计:消息轨迹追踪 + 监控告警
一、消息丢失的三大场景
1. 生产者发送丢失
- 网络故障导致发送失败
- 异步发送未处理SendCallback
- 事务消息Commit失败
2. Broker存储丢失
- 未开启同步刷盘时宕机
- 主从同步复制未完成时主节点宕机
- 磁盘损坏导致数据丢失
3. 消费过程丢失
- 自动ACK模式下处理失败
- 重试次数耗尽进入死信队列
- 消费者宕机导致消息未处理
二、完整解决方案(代码示例)
1. 生产者端保障
// 事务消息发送(关键代码)
TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setNamesrvAddr("127.0.0.1:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
// 执行本地事务
return LocalTransactionState.COMMIT_MESSAGE;
}
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 检查本地事务状态
return LocalTransactionState.COMMIT_MESSAGE;
}
});
// 同步发送(必须捕获异常)
try {
SendResult result = producer.sendMessageInTransaction(msg, null);
if (result.getSendStatus() != SendStatus.SEND_OK) {
// 重试逻辑
}
} catch (Exception e) {
// 告警 + 重试
}关键配置:
- retryTimesWhenSendFailed=3(发送失败重试)
- sendLatencyFaultEnable=true(故障规避机制)
2. Broker端保障
刷盘策略:
# broker.conf 关键配置
flushDiskType = SYNC_FLUSH # 同步刷盘
brokerRole = SYNC_MASTER # 主从同步复制
高可用架构:
- DLedger模式实现自动选主
- 多副本部署(至少1主2从)
- 定期备份CommitLog到对象存储
3. 消费者端保障
// 消费者示例(手动ACK)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
// 业务处理
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 重试(默认16次)
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});
// 关闭自动ACK
consumer.setConsumeMessageBatchMaxSize(1);关键配置:
- suspendCurrentQueueTimeMillis=1000(重试间隔)
- maxReconsumeTimes=16(最大重试次数)
三、最佳实践与监控
1. 消息轨迹追踪
- 开启traceTopic收集消息全链路状态
- 使用RocketMQ-Console监控消息轨迹
2. 监控告警体系
- 监控指标:堆积量、发送耗时、ACK延迟
- 配置规则:当堆积量>1万时触发告警
3. 容灾设计
- 同城双活部署(跨机房主从)
- 定期演练Broker故障切换
- 死信队列监控与人工干预
四、常见错误
- 错误1:使用异步发送但未实现回调逻辑
- 错误2:Broker配置ASYNC_FLUSH(异步刷盘)
- 错误3:消费者捕获异常后仍返回CONSUME_SUCCESS
- 错误4:未限制消费线程数导致消息积压
五、扩展知识
- DLedger协议:基于Raft的分布式共识算法,解决主从数据强一致
- 消息索引:CommitLog顺序写 + ConsumeQueue索引分离设计
- 零拷贝:MappedByteBuffer提升文件读写性能