题目
RocketMQ 消息消费失败如何处理?请设计完整的异常处理方案
信息
- 类型:问答
- 难度:⭐⭐
考点
消息重试机制,死信队列设计,消费幂等性保障
快速回答
RocketMQ 消息消费失败处理的核心方案:
- 自动重试机制:通过返回
RECONSUME_LATER触发阶梯式重试 - 死信队列(DLQ):超过最大重试次数的消息自动转入死信队列
- 幂等性设计:使用唯一键+状态机/分布式锁避免重复消费
- 人工干预:通过控制台查询和重发死信消息
1. 核心处理流程

(图片来源:RocketMQ 官网)
当消费者处理消息失败时,完整的异常处理流程:
- 消费者返回
ConsumeConcurrentlyStatus.RECONSUME_LATER - 消息进入重试队列,延迟时间按阶梯递增(1s, 5s, 10s, 30s...)
- 达到最大重试次数(默认16次)后转入死信队列
- 监控系统告警并触发人工处理
2. 代码实现示例
// 消费者实现(带重试逻辑)
public class OrderConsumer implements MessageListenerConcurrently {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List msgs,
ConsumeConcurrentlyContext context) {
try {
// 业务处理(如数据库操作)
processOrder(msgs.get(0));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} catch (BusinessException e) {
// 可重试异常
log.error("消费失败,触发重试", e);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
} catch (FatalException e) {
// 不可重试异常(如数据格式错误)
log.error("严重错误,跳过重试", e);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 避免死循环
}
}
}
// 死信队列消费者
public class DLQConsumer {
// 订阅:%DLQ%消费组名
} 3. 关键配置项
| 参数 | 默认值 | 说明 |
|---|---|---|
| maxReconsumeTimes | 16 | 最大重试次数 |
| messageDelayLevel | 1s 5s... | 重试延迟级别 |
| enableDLQ | true | 死信队列开关 |
4. 幂等性设计实践
避免重试导致重复消费的常用方案:
// 基于唯一业务ID的幂等处理
public void processOrder(MessageExt msg) {
String orderId = msg.getKeys();
if (redis.setnx("lock:" + orderId, "1", 300)) {
try {
if (!orderService.isProcessed(orderId)) {
// 执行业务操作
}
} finally {
redis.del("lock:" + orderId);
}
}
}5. 最佳实践
- 重试策略:
- 非幂等操作使用有限次数重试
- 网络抖动等临时错误应重试
- 死信处理:
- 单独部署死信消费者
- 记录完整错误上下文
- 提供管理界面人工重发
- 监控:
- 监控死信队列堆积量
- 配置重试率告警(如 >5%)
6. 常见错误
- 错误1:捕获所有异常返回
SUCCESS
→ 导致严重错误消息丢失 - 错误2:无限制重试导致消息积压
→ 应设置合理的 maxReconsumeTimes - 错误3:未处理死信消息
→ 需部署独立消费者并设置告警
7. 扩展知识
- 事务消息:半消息机制保障本地事务与消息发送一致性
- 顺序消息:失败时需保证同一队列内消息顺序重试
- 消息轨迹:通过
traceTopic配置追踪消息全生命周期