侧边栏壁纸
博主头像
colo

欲买桂花同载酒

  • 累计撰写 1823 篇文章
  • 累计收到 0 条评论

如何保证RocketMQ在生产者发送消息时不丢失?

2025-12-7 / 0 评论 / 4 阅读

题目

如何保证RocketMQ在生产者发送消息时不丢失?

信息

  • 类型:问答
  • 难度:⭐⭐

考点

消息可靠性机制,生产者高可用设计,同步刷盘策略

快速回答

保证生产者消息不丢失的核心方案:

  • 同步发送+重试机制:使用同步发送并配置重试次数
  • 事务消息:对强一致性场景使用事务消息机制
  • Broker高可用:部署主从集群并开启同步刷盘(SYNC_FLUSH)
  • 发送状态检查:严格处理SendResult返回值
## 解析

1. 消息丢失的典型场景

生产者消息丢失主要发生在:

  • 网络故障:消息未到达Broker
  • Broker异常:消息未持久化即宕机
  • 异步发送:未处理发送失败回调

2. 核心解决方案

2.1 同步发送与重试机制

// 创建生产者(同步模式)
DefaultMQProducer producer = new DefaultMQProducer("producer_group");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.setRetryTimesWhenSendFailed(3); // 设置同步发送重试次数
producer.start();

// 发送消息并检查结果
Message msg = new Message("TopicTest", "TagA", "KEY_001", "Hello RocketMQ".getBytes());
SendResult sendResult = producer.send(msg); // 同步阻塞调用
if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
    // 处理发送失败逻辑
}

关键配置

  • setRetryTimesWhenSendFailed:网络异常时自动重试(默认2次)
  • setRetryAnotherBrokerWhenNotStoreOK:Broker异常时切换节点重试

2.2 事务消息(强一致性场景)

TransactionMQProducer producer = new TransactionMQProducer("tx_group");
producer.setTransactionListener(new TransactionListener() {
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        // 执行本地事务
        return LocalTransactionState.COMMIT_MESSAGE; // 提交消息
    }
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        // Broker回调检查事务状态
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});
producer.sendMessageInTransaction(msg, null); // 发送事务消息

执行流程

  1. 发送半消息(对消费者不可见)
  2. 执行本地事务
  3. 根据事务状态提交/回滚消息

2.3 Broker高可用配置

  • 同步刷盘flushDiskType = SYNC_FLUSH(默认ASYNC_FLUSH)
  • 主从同步brokerRole = SYNC_MASTER(确保从节点确认写入)
  • 持久化策略:推荐多副本机制(Dledger)

3. 最佳实践

  • 生产环境必配:同步发送 + 重试次数 ≥ 3
  • 关键业务:结合事务消息 + 数据库事务
  • 监控告警:监控SendResult状态和异常日志

4. 常见错误

  • 错误1:使用异步发送但未实现回调接口处理失败
  • 错误2:忽略SendResult返回值状态检查
  • 错误3:Broker单点部署且使用异步刷盘

5. 扩展知识

  • 消息轨迹:通过traceTopicEnable=true追踪消息生命周期
  • 发送延迟监控producer.getDefaultMQProducerImpl().getSendLatencyFaultEnable()
  • 顺序消息场景:同步发送必须配合MessageQueueSelector