侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

如何保证RocketMQ消息不丢失?请设计一个高可靠消息方案

2025-12-6 / 0 评论 / 3 阅读

题目

如何保证RocketMQ消息不丢失?请设计一个高可靠消息方案

信息

  • 类型:问答
  • 难度:⭐⭐

考点

消息可靠性机制,事务消息,消费幂等性,高可用部署

快速回答

保证RocketMQ消息不丢失需要端到端的解决方案:

  • 生产者端:使用事务消息+本地事务表,开启同步刷盘和同步复制
  • Broker端:主从同步复制+同步刷盘,多副本部署
  • 消费者端:手动ACK+重试队列+消费幂等设计
  • 监控:配置消息轨迹和监控告警
## 解析

一、消息丢失的三大场景

1. 生产者发送丢失

  • 网络抖动导致发送失败
  • 异步发送未处理SendCallback
  • 事务消息未正确提交

2. Broker存储丢失

  • 未开启同步刷盘(异步刷盘时宕机)
  • 主从同步延迟(异步复制)
  • 磁盘损坏(无多副本)

3. 消费者处理丢失

  • 自动ACK导致消息未处理完成就确认
  • 消费逻辑异常未重试
  • 重复消费导致业务异常

二、完整解决方案

1. 生产者保障(代码示例)

// 事务消息发送(半消息机制)
TransactionSendResult result = producer.sendMessageInTransaction(msg,
new LocalTransactionExecuter() {
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 1. 执行本地事务(记录事务表)
dbService.process(msg);
// 2. 返回COMMIT_MESSAGE
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
// 3. 失败则回滚
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}, null);

// 必须设置同步发送+重试
producer.setRetryTimesWhenSendFailed(3);
producer.setSendMsgTimeout(5000);

关键配置:

  • producerGroup使用事务消息
  • 同步发送+超时控制
  • 本地事务表记录消息状态

2. Broker高可用部署

架构要求:

  • 同步刷盘:flushDiskType=SYNC_FLUSH
  • 同步复制:brokerRole=SYNC_MASTER
  • 多副本部署:至少1主2从
  • Dledger模式:基于Raft协议实现自动选主

配置示例:

# broker-a.properties
brokerClusterName=MyCluster
brokerName=broker-a
brokerId=0 # 0表示Master
brokerRole=SYNC_MASTER
flushDiskType=SYNC_FLUSH
enableDLegerCommitLog=true # 开启Dledger

3. 消费者保障

// 手动ACK+重试策略
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
for (MessageExt msg : msgs) {
// 1. 业务处理(需幂等)
processWithIdempotency(msg);
}
// 2. 成功则返回CONSUME_SUCCESS
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 3. 失败则挂起重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});

// 关键配置
consumer.setMaxReconsumeTimes(5); // 最大重试次数
consumer.setSuspendCurrentQueueTimeMillis(3000); // 重试间隔

幂等设计方法:

  • 数据库唯一键约束
  • Redis原子操作记录消息ID
  • 消息携带业务唯一ID(如订单号)

三、监控与运维

  • 消息轨迹:开启traceTopicEnable=true
  • 监控指标
    • 未消费消息堆积量(msgBack
    • 发送/消费TPS
    • 存储水位(diskFallBehind
  • 告警规则:消息堆积超过1万条立即告警

四、常见错误

  • 误区1:仅依赖自动ACK机制
  • 误区2:Broker使用异步刷盘+异步复制
  • 误区3:未处理事务消息的回查机制
  • 误区4:消费逻辑无幂等设计

五、扩展知识

  • 顺序消息场景:需保证分区内有序,避免并发消费
  • 延迟消息:使用message.setDelayTimeLevel但注意最大延迟2小时
  • 死信队列:重试16次后进入%DLQ%consumerGroup