题目
RocketMQ 消息消费失败后如何实现可靠的重试机制?
信息
- 类型:问答
- 难度:⭐⭐
考点
消息重试机制,死信队列设计,消费幂等性
快速回答
RocketMQ 通过多级重试机制保证消息可靠消费:
- 顺序消息:在客户端自动重试(默认16次)
- 普通消息:服务端延时重投(默认16次,时间间隔递增)
- 死信队列:超过最大重试次数的消息转入特殊队列
- 幂等性:需业务端通过唯一键+状态机实现
一、核心原理
RocketMQ 的重试机制分为两个维度:
- 客户端重试(顺序消息):消费失败后立即在客户端重试,由
DefaultMQPushConsumer控制 - 服务端重试(普通消息):消费失败后消息返回Broker,按延时级别重新投递
二、重试流程
// 消费者配置示例
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name");
consumer.setMaxReconsumeTimes(10); // 最大重试次数
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
// 业务处理逻辑
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 根据异常类型决定重试策略
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});三、重试策略详解
| 重试次数 | 延时时间 | 对应延时级别 |
|---|---|---|
| 1 | 10秒 | 3 |
| 2 | 30秒 | 4 |
| 3 | 1分钟 | 5 |
| ≥4 | 2小时 | 14 |
(默认最大16次,可通过 maxReconsumeTimes 调整)
四、死信队列处理
当重试超过阈值时,消息进入死信队列:
- 命名规则:
%DLQ% + 消费者组名 - 处理方式:
- 人工干预:通过控制台查看并处理
- 自动订阅:独立消费者订阅死信队列
五、幂等性设计最佳实践
// 基于Redis的幂等校验示例
public boolean checkIdempotent(String messageId) {
String key = "msg:" + messageId;
// SETNX 实现原子操作
Boolean result = redisTemplate.opsForValue().setIfAbsent(key, "processed", 24, TimeUnit.HOURS);
return Boolean.TRUE.equals(result);
}推荐方案:
- 唯一标识:使用
MessageExt.getMsgId()+ 业务主键 - 状态机:结合数据库事务更新状态
- 分布式锁:对关键资源加锁
六、常见错误
- 误区1:在消费逻辑中捕获异常但不返回
RECONSUME_LATER - 误区2:无限重试导致系统雪崩(需合理设置
maxReconsumeTimes) - 误区3:未处理死信消息导致数据丢失
- 误区4:依赖
msgId做幂等(相同消息不同投递msgId不同)
七、扩展知识
- 消息轨迹:通过
traceTopicEnable=true开启重试轨迹追踪 - 重置消费位点:对死信队列可通过
resetOffsetByTime重新消费 - 延时消息:与重试机制共用
messageDelayLevel配置