目录

RocketMQ 积压大量消息处理(搬运工模式)

这篇文章是对 https://mp.weixin.qq.com/s/ih6hhTPTAgfT0L6MLObmqA 的学习,主要是针对 RocketMQ 积压大量消息的场景,分享一些排查思路和解决方案。

本文只聚焦判断逻辑和关键取舍
相关细节在 wiki RocketMQ 积压大量消息处理 中补充。

在使用 RocketMQ 的线上系统一旦遭遇突发流量,消息系统往往最先扛不住。生产端在短时间内持续放量,而消费端与下游处理链路无法同步放大,Topic 中的消息开始堆积,延迟拉长,告警随之而来。

这类问题如果只停留在“多加几台消费者机器”,结果通常并不理想。真正决定吞吐上限的,并不是机器数量,而是 RocketMQ 的队列模型,以及下游系统的承载边界。


一、先把模型想清楚

在 RocketMQ 里,消息的并行能力并不是无限的。

  • Topic 是逻辑分类,用来区分业务域。

  • Queue 才是真正决定并行度的实体。一个 Topic 会被切成多个 Queue。

  • Consumer Group 内部做负载均衡,但有一个硬约束:

    同一个 Group 中,一个 Queue 在同一时刻只能被一个 Consumer 实例消费。

换句话说,Queue 像窗口,Consumer 像办事员。窗口不够,多来的办事员只能站着。

这条规则,决定了很多“加机器没效果”的结局。


二、积压从哪里来

积压本身并不神秘,本质只有一个前提:

生产速度长期高于消费速度。

但这句话落到系统里,往往表现为不同的形态:

  • 消费逻辑涉及数据库、RPC、锁竞争,单条消息处理时间被拉长;
  • Topic 的 Queue 数量偏少,并行能力被封死;
  • Broker 的磁盘或网络 IO 已接近极限,读写开始排队。

因此,积压不是一个原因,而是一种结果。


三、为什么“加 Consumer”常常无效

假设一个 Topic 只有 4 个 Queue。

  • 4 个消费者实例:刚好每人一个 Queue。
  • 扩到 20 个实例:仍然只有 4 个在真正干活,其余实例空转。

吞吐上限没有任何变化。

这也是为什么单纯扩容消费者,经常看不到积压下降,甚至把下游数据库拖垮。


四、直接扩 Queue,为什么救不了存量

直觉上,Queue 少就加 Queue。但在积压已经形成之后,这个操作对“历史消息”几乎没有帮助。

原因很简单:

  • RocketMQ 按 Queue 存储消息;
  • 已经积压的消息,固定躺在旧 Queue 中;
  • 新增 Queue 只对新消息生效。

所以,扩 Queue 更多是一个防止继续积压的长期手段,而不是即时止血方案。


五、清存量的核心思路:换一条更宽的通道

当旧 Topic 的 Queue 已经成为并行瓶颈时,硬撑没有意义。

比较可控的做法是: 把存量消息先搬出来,放到一个 Queue 足够多的新 Topic 中处理。

这并不是“优化消费逻辑”,而是通过空间换时间。


六、“搬运工模式”的实际落地

1)先做一件很“笨”的事:只搬,不处理

新增一个 Consumer,只做三件事:

  1. 从旧 Topic 拉消息;
  2. 不做任何业务逻辑;
  3. 原样写入一个新 Topic(临时 Topic)。

因为不查库、不调接口,瓶颈基本只剩 IO,搬运速度通常远高于正常消费。

旧 Topic 的积压量,会在短时间内明显下降。

2)新 Topic 的设计目标只有一个:并行

临时 Topic 的 Queue 数,需要明显高于旧 Topic,例如 50、100。

目的很直接: 让后续的业务消费可以被拆散到更多实例上执行。

3)业务 Consumer 改为消费新 Topic

此时才恢复完整业务逻辑。

由于 Queue 足够多,消费并行度被拉开,总体处理速度才真正提升。


七、这个方案不是“无脑可用”

顺序问题

如果消息存在严格顺序要求(例如同一订单的状态流转),搬运过程中必须使用同一个业务 key 做队列路由,保证同一 key 的消息仍然落在同一个 Queue 中。

否则,乱序带来的不是延迟,而是业务错误。

Broker 压力

搬运意味着 Broker 同时承受大量读和写,IO 压力几乎翻倍。

如果磁盘、网络已经接近红线,贸然搬运,很容易把 MQ 本身打挂。

下游承载

并行度一旦拉开,最先扛不住的往往不是 MQ,而是数据库或第三方接口。

如果积压本身就是 DB 慢导致的,放大并发只会制造二次事故。


八、什么时候不值得“救全量”

并不是所有积压都必须清完。

日志、埋点、非核心通知类消息,在系统已经承压的情况下,继续硬处理只会拖垮主链路。

这时理性的选择是:

  • 跳过历史 offset,让系统回到实时水位;
  • 或转存到死信队列,低峰期再处理;
  • 必要时直接丢弃非关键数据。

系统先活下来,比数据完整更重要。


九、事后必须追的不是“怎么清”,而是“为什么慢”

积压只是表象,真正需要复盘的是消费能力下降的根因,例如:

  • 慢 SQL、索引缺失、锁竞争;
  • 外部接口 RT 拉长,引发重试风暴;
  • 消费线程池或连接池耗尽;
  • Topic 设计阶段缺乏容量评估;
  • 没有削峰、限流、降级手段。

如果这些问题不解决,再清一次,还会再来一次。


十、结论

  • 积压的前提永远是:生产 > 消费
  • 在集群消费模式下,并行度上限 = Queue 数
  • 扩 Consumer 超过 Queue 数,必然空转
  • 扩原 Topic 的 Queue,救不了历史消息
  • 清存量要么优化消费本身,要么换到更多 Queue 的新通道
  • 所有提速操作之前,先确认顺序、Broker IO、下游承载是否安全