题目
RocketMQ 消息消费失败后如何正确处理?
信息
- 类型:问答
- 难度:⭐⭐
考点
消息重试机制,死信队列设计,消费幂等性保障
快速回答
处理RocketMQ消费失败的核心要点:
- 重试机制:默认重试16次,间隔逐渐增加
- 死信队列:超过最大重试次数的消息自动进入%DLQ%开头的特殊队列
- 幂等设计:通过唯一键+状态机/数据库唯一索引保证重复消费安全
- 错误处理:根据业务返回
RECONSUME_LATER或记录错误日志
一、核心处理流程

当消费者返回RECONSUME_LATER(或抛出异常)时触发重试机制
二、重试机制详解
// 消费者代码示例
consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
try {
// 业务处理逻辑
processMessage(msgs);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (Exception e) {
// 记录错误日志
log.error("消费失败", e);
// 触发重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
});重试策略:
- 第1-3次:10秒间隔
- 第4-6次:30秒间隔
- 第7-16次:1分钟间隔
- 超过16次进入死信队列
三、死信队列处理
特征:
- 队列名称:
%DLQ%消费组名 - 消息属性:保留原始消息所有属性
- 特殊字段:
RECONSUME_TIMES记录重试次数
处理方案:
- 创建独立消费者订阅死信队列
- 实现告警通知(邮件/短信)
- 人工干预或自动修复脚本
四、幂等性保障方案
// 基于唯一业务ID的幂等处理
public boolean processWithIdempotent(Message msg) {
String msgId = msg.getMsgId();
String bizId = msg.getUserProperty("ORDER_ID");
// 1. 检查Redis幂等键
if (redis.exists("consumed:" + bizId)) {
return true; // 已处理
}
// 2. 数据库唯一约束
try {
insertToDB(bizId, msg); // 数据库有唯一索引
} catch (DuplicateKeyException e) {
return true;
}
// 3. 状态机校验
OrderState state = orderService.getState(bizId);
if (state == OrderState.PROCESSED) {
return true;
}
// 实际业务处理
processBusiness(msg);
// 设置处理标记
redis.setex("consumed:" + bizId, 24*3600, "1");
return true;
}五、最佳实践
- 重试调优:通过
maxReconsumeTimes参数调整最大重试次数 - 错误隔离:不同错误类型采用不同处理策略(网络错误重试/数据错误人工介入)
- 监控配置:监控死信队列堆积情况,设置阈值告警
- 事务消息:对资金类操作使用事务消息保证最终一致性
六、常见错误
- ❌ 捕获异常后返回
SUCCESS导致消息丢失 - ❌ 未处理死信导致消息永久堆积
- ❌ 依赖
msgId做幂等(不同投递msgId不同) - ❌ 在消费逻辑中写无限重试循环
七、扩展知识
- 顺序消息:失败时返回
SUSPEND_CURRENT_QUEUE_A_MOMENT暂停队列 - 批量消费:局部失败需构建
successList部分提交 - 消息轨迹:通过
traceTopic配置跟踪消息生命周期