侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

RocketMQ 如何保证消息不丢失?请设计一个高可靠的消息发送方案

2025-12-7 / 0 评论 / 6 阅读

题目

RocketMQ 如何保证消息不丢失?请设计一个高可靠的消息发送方案

信息

  • 类型:问答
  • 难度:⭐⭐

考点

消息可靠性机制,高可用架构设计,事务消息实现

快速回答

保证消息不丢失的核心方案:

  • 生产者端:启用同步发送 + 事务消息 + 重试机制
  • Broker端:同步刷盘 + 主从同步复制
  • 消费者端:手动ACK + 消费重试机制
  • 系统设计:消息轨迹追踪 + 监控告警
## 解析

一、消息丢失的三大场景

1. 生产者发送丢失

  • 网络故障导致发送失败
  • 异步发送未处理SendCallback
  • 事务消息Commit失败

2. Broker存储丢失

  • 未开启同步刷盘时宕机
  • 主从同步复制未完成时主节点宕机
  • 磁盘损坏导致数据丢失

3. 消费过程丢失

  • 自动ACK模式下处理失败
  • 重试次数耗尽进入死信队列
  • 消费者宕机导致消息未处理

二、完整解决方案(代码示例)

1. 生产者端保障

// 事务消息发送(关键代码)  
TransactionMQProducer producer = new TransactionMQProducer("group");
producer.setNamesrvAddr("127.0.0.1:9876");
// 设置事务监听器
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        return LocalTransactionState.COMMIT_MESSAGE; 
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // 检查本地事务状态
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});
// 同步发送(必须捕获异常)
try {
    SendResult result = producer.sendMessageInTransaction(msg, null);
    if (result.getSendStatus() != SendStatus.SEND_OK) {
        // 重试逻辑
    }
} catch (Exception e) {
    // 告警 + 重试
}

关键配置:

  • retryTimesWhenSendFailed=3(发送失败重试)
  • sendLatencyFaultEnable=true(故障规避机制)

2. Broker端保障

刷盘策略:

# broker.conf 关键配置
flushDiskType = SYNC_FLUSH  # 同步刷盘
brokerRole = SYNC_MASTER    # 主从同步复制

高可用架构:

  • DLedger模式实现自动选主
  • 多副本部署(至少1主2从)
  • 定期备份CommitLog到对象存储

3. 消费者端保障

// 消费者示例(手动ACK)
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group");
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
    try {
        // 业务处理
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    } catch (Exception e) {
        // 重试(默认16次)
        return ConsumeConcurrentlyStatus.RECONSUME_LATER; 
    }
});
// 关闭自动ACK
consumer.setConsumeMessageBatchMaxSize(1);

关键配置:

  • suspendCurrentQueueTimeMillis=1000(重试间隔)
  • maxReconsumeTimes=16(最大重试次数)

三、最佳实践与监控

1. 消息轨迹追踪

  • 开启traceTopic收集消息全链路状态
  • 使用RocketMQ-Console监控消息轨迹

2. 监控告警体系

  • 监控指标:堆积量、发送耗时、ACK延迟
  • 配置规则:当堆积量>1万时触发告警

3. 容灾设计

  • 同城双活部署(跨机房主从)
  • 定期演练Broker故障切换
  • 死信队列监控与人工干预

四、常见错误

  • 错误1:使用异步发送但未实现回调逻辑
  • 错误2:Broker配置ASYNC_FLUSH(异步刷盘)
  • 错误3:消费者捕获异常后仍返回CONSUME_SUCCESS
  • 错误4:未限制消费线程数导致消息积压

五、扩展知识

  • DLedger协议:基于Raft的分布式共识算法,解决主从数据强一致
  • 消息索引:CommitLog顺序写 + ConsumeQueue索引分离设计
  • 零拷贝:MappedByteBuffer提升文件读写性能