目录

RocketMQ 积压大量消息处理方式

目录

这篇文章是对 https://mp.weixin.qq.com/s/ih6hhTPTAgfT0L6MLObmqA 的学习,主要是针对 RocketMQ 积压大量消息的场景,分享一些排查思路和解决方案。

在使用 RocketMQ 承接核心链路流量的系统中,突发流量容易把消费端与下游处理链路推到瓶颈区:生产速度短时间内大于消费速度,Topic 出现大规模积压并触发告警。此类场景的关键不在“盲目扩容消费者”,而在先确认队列模型带来的并行上限与下游承载边界,再选择能真实提升吞吐的处理方式,并把顺序性、Broker I/O、数据库压力等风险纳入救火方案。


1)先理解基础概念

RocketMQ 里最常见的 4 个名词

  • Topic:消息的大类目,类似“一个频道”。例如 order_eventpay_event

  • Message(消息):一条具体的数据记录,例如“订单创建了”“支付成功了”。

  • Queue(队列):Topic 内部会被切成多个“分片”。可以把 Topic 想成一条大河,Queue 就是多条分流的河道。

    • Queue 数越多,理论上并行处理能力上限越高(因为可以多路并行)。
  • Consumer(消费者):真正处理消息的程序/实例,通常部署在多台机器上跑。

https://file.oss.aixfan.com/img/screenshot/2025/12/28/RocketMQ%20%E5%B8%B8%E8%A7%81%E5%90%8D%E8%AF%8D.png

关键规则:集群消费模式下的“一夫一妻制”

文章讲的核心铁律是:

同一个 Consumer Group 里,一个 Queue 同一时刻只能分配给一个 Consumer 实例消费。

用更直白的比喻:

  • Queue 像“窗口”,Consumer 像“办事员”。
  • 一个窗口只能站一个办事员;多来的办事员没有窗口就只能闲着。

这条规则就是为什么“面试官”会嘲笑“盲目加机器”。


2)故事背景

线上突发流量 → 生产者疯狂发消息 → Topic 积压到 1 亿条 → 消费端处理不过来 → 监控告警(延迟、堆积量、失败率上升)。

理解的关键点:

  • 积压不是“消息丢了”,而是“消息还在 MQ 里排队没被处理”。
  • 队列里排队太多,意味着业务处理有延迟,严重时可能影响主链路。

3)原因(为什么会积压?为什么“加机器”常常没用?)

3.1 积压的本质原因:生产速度 > 消费速度

这句话很抽象,拆开看:

  • 生产速度:每秒往 MQ 写多少条消息(QPS)。
  • 消费速度:每秒能处理多少条消息(TPS),而且通常要做业务动作:查库、写库、调用第三方等。

一旦生产速度持续高于消费速度,就会像“进门的人比出门的人多”,队伍越来越长。

3.2 为什么“加 Consumer 机器”可能完全无效

假设:Topic 只有 4 个 Queue(很多系统创建 Topic 默认就是少量队列)。

  • 现在有 2 台机器跑消费者:每台分到 2 个 Queue,刚好都在干活。
  • 扩到 10 台机器:最多只有 4 台能分到 Queue,剩下 6 台没有 Queue 只能围观。 结果:吞吐并不会因为多了 6 台机器而提高。

结论:

Queue 是并行度的“天花板”。Queue 不够,多出来的消费者等于摆设。

3.3 为什么不能直接把原 Topic 的 Queue 改成 100 来救急

直觉上:Queue 少就加 Queue。 但文章强调:这对“已经积压的存量消息”没用。

原因是 RocketMQ 的存储是按 Queue 分开的:

  • 积压的 1 亿条消息已经“躺在旧的 4 个 Queue 里”。
  • 你把 Queue 改到 100 后,RocketMQ 只会把新来的消息写到新 Queue。
  • 那 1 亿条老消息还在旧队列里,仍然只能 4 个消费者并行慢慢啃。 所以“改配置”更多是长期优化,不是即时止血。

4)处理积压(搬运工模式)

核心思想:旧池子太窄,就把水先导到新池子

旧 Topic 的 4 个 Queue 限制死了并行能力。 解决办法不是在旧池子里硬撑,而是把积压先搬到一个“更宽的新池子”。

第一步:写一个“搬运工 Consumer”(不跑业务,只搬运)

搬运工做什么?

  1. 从旧 Topic 读消息(按原来的 4 个 Queue 读)
  2. 不做业务处理(不查库、不算逻辑、不调接口)
  3. 直接把消息写到新 Topic(Topic-Temp)

为什么这样会快?

  • 因为业务处理通常才是慢的(DB、网络、锁),而搬运只做 IO,速度接近上限。
  • 即使旧 Topic 只有 4 个 Queue,也能把“把消息读出来”这件事做得很快。

阶段性结果:旧 Topic 的积压量快速下降,告警先止血。

第二步:新建 Topic-Temp,把 Queue 开大(比如 100)

新 Topic 的设计目标很明确:

让并行度足够高,让业务处理能被分摊到更多机器上。

Topic-Temp:Queue = 100 这样就能部署 100 个业务消费者实例,每个实例稳定拿到 1 个 Queue 干活。

第三步:让真正的业务 Consumer 去消费 Topic-Temp

这时才跑“真实业务逻辑”:查库、写库、扣库存、发通知等。 因为并行度已经拉开,整体处理速度提升。

这一套的本质

  • 旧 Topic:受限于“存量在 4 个 Queue”
  • 新 Topic:用更多 Queue 提供更高并行度 这就是文章说的:空间换时间(新增一个 Topic 作为中转空间,换取更快的处理速度)。

5)结果

体现了三个能力:

  1. 懂 RocketMQ 负载均衡/分配机制:知道 Queue 才是并行度上限。
  2. 能处理“存量”问题:知道改 Queue 只对增量有效,存量要换通道。
  3. 能给可落地的 SOP:搬运→扩队列→并行处理,能在生产救火。

6)风险点(容易忽略,但最致命)

风险 1:顺序消息可能被“搬乱”

什么是顺序消息?

  • 例如订单事件必须按顺序:创建 → 支付 → 完成 如果乱序,下游可能先收到“支付成功”,再收到“订单创建”,业务直接错乱。

为什么搬运会乱序?

  • 搬运通常会多线程/多实例快速读取 + 写入新 Topic,写入时可能打散顺序。

怎么做才相对安全?

  • 搬运时保留 Sharding Key(如订单号),保证同一订单的消息仍进入新 Topic 的同一个 Queue,至少做到“同一订单内部不乱”。

记忆:顺序要求越强,搬运越要“按同一业务键归队”。

风险 2:Broker I/O 被打爆

搬运等于让 Broker 同时承受:

  • 从旧 Topic 大量读(读磁盘/网络)
  • 往新 Topic 大量写(写磁盘/网络)

I/O 直接翻倍,Broker 可能先挂。

对策:

  • 先看监控(磁盘利用率、IO wait、CPU、网络)
  • Broker 已经红了:先扩容 Broker / 降速搬运 / 先限流再搬运

风险 3:下游数据库被 100 并发冲垮(最常见二次事故)

积压很多时候就是因为 DB 慢、接口慢。 这时如果把业务消费者扩到 100:

  • 消费速度确实变快
  • 但 100 个并发一起打 MySQL/第三方,相当于对自己发起 DDoS
  • 结果:DB 崩了,全站 P0

对策:

  • 扩容前先确定瓶颈是否在 DB/外部依赖
  • 必要时:限流、降级、批处理、写入队列后异步合并、加缓存、分库分表等

7)方案 B(止损策略:不是所有消息都值得救)

如果积压的是非核心数据(日志、埋点、非资金通知),继续硬处理可能影响新消息进入、拖垮系统。 此时更成熟的做法是“先保主业务”:

  • Skip Offset:直接跳过老积压,让系统回到实时水位
  • 导入死信队列:把老消息转存,等低峰慢慢补

理解:

先救活病人,再讨论康复训练;先恢复服务,再谈补历史数据。


8)复盘结论(学习落点:止血之后要治本)

积压只是一种症状,真正需要追的是“为什么消费变慢”:

  • DB 变慢?(慢 SQL、锁、连接池不足)
  • 第三方接口超时?
  • 业务代码死锁/线程池打满?
  • 消费逻辑幂等/重试导致雪崩?
  • 流量突增缺乏限流/削峰?

最终目标是:

  • 事故时能用 SOP 快速止血
  • 平时能用架构手段避免再次发生(限流、削峰、隔离、容量评估、压测、监控告警)

9) 细节

积压“搬运一定能救火”

搬运只是止血手段之一,前提是“消费逻辑本身没问题,只是并行度不够”。 如果消费逻辑有缺陷(慢 SQL、外部依赖慢等),搬运后依然会积压。

搬运会让 Broker 同时“读旧 + 写新”,额外制造一倍的网络/磁盘压力。 如果 Broker 已经红线(磁盘 IO、CPU、PageCache、网络),搬运可能先把 MQ 打挂——这时搬运不是救火,是添柴。

方案要成立,必须满足的前置条件:

① 消息语义允许“至少一次”且能容忍重复(或已做幂等)

搬运时常见链路是: 旧 Topic 拉取 → 发到新 Topic → 提交旧 Topic 的 offset 只要中间任一步失败重试,就可能出现重复投递(这是 MQ 常态)。 如果下游写库/扣款/状态流转没有幂等,重复会直接造成业务错账。

必须具备之一:

消费侧业务幂等(推荐:按业务唯一键去重,如订单号 + 事件类型 + 版本号)

或搬运侧引入去重/事务保证(复杂且成本高,救火时不一定来得及)

② 消息顺序要求清晰:能否乱序?是“全局顺序”还是“同一业务键顺序”?

RocketMQ 的“顺序消息”一般是同一 Sharding Key(如订单号)在同一队列内有序,而不是全局有序。 搬运会破坏顺序的典型原因是:并发搬运时把同一订单的消息写进了不同队列。

若要求同一订单内严格顺序:

搬运写入 Topic-Temp 时必须用同样的 key 做队列选择(保持同 key 落同队列)

消费 Topic-Temp 时需要按顺序消费模式处理(或至少保证单 key 串行)

③ Broker 与网络 I/O 有余量,且能承受“读写翻倍”

需要在监控里确认:磁盘利用率、IO wait、写入延迟、网络带宽、Broker 堆外内存等。 一旦资源已爆,优先动作应是:限流/降载/扩 Broker/扩盘,而不是搬运。

④ 下游(DB/外部接口)能承受扩出来的并发

如果积压根因是 DB 慢、第三方慢,扩大并发只会把下游打挂。 这时正确动作往往是:

先定位瓶颈(慢 SQL、连接池、锁、接口超时)

对消费进行限速、分批、熔断、降级

或把重操作异步化/落盘再慢慢补

对“搬运工模式”还必须补上的工程细节

  1. offset 提交时机 必须做到“发到新 Topic 成功后,再提交旧 Topic 的 offset”。否则可能丢消息。 但这样会带来重复风险,因此幂等是配套前提。

  2. 限速与背压 搬运程序不能“光速全开”,需要按 Broker/Topic-Temp/下游承载做限流;否则把 MQ 写爆或把 Topic-Temp 撑满磁盘。 Topic-Temp 的保留策略与容量 Topic-Temp 会额外占用磁盘(甚至比原 Topic 更大,因为重复存储一份)。需要评估 retention、磁盘水位、清理策略。

  3. 消息属性保留 如果原消息依赖 tags、keys、延时等级、重试次数、trace 等属性,搬运时要尽量保留,否则排障与业务行为可能变化。

  4. 新旧两条流如何收口(搬运期间新消息仍在进入原 Topic)。

  • 原 Topic 的“正常消费者”是否继续跑(通常要继续,避免新积压)
  • 搬运只处理“老积压”还是全量
  • 何时停止 Topic-Temp,如何切回常态

10) 如何排查

0)启动条件与目标

触发条件:

  • Topic 积压量持续上升(例如 > 1000 万且仍在增长)
  • 消费延迟持续扩大(例如延迟分钟级/小时级)
  • 业务告警出现:订单/支付/通知延迟、失败率升高、超时升高

明确目标:

  1. 先让系统不再继续恶化(止血)
  2. 再把积压降到可控水位(提速)
  3. 最后恢复常态并防复发(收口)

1)第 1 阶段:快速分诊

目的:判定“加速是否安全”,避免一上来就把 Broker 或 DB 打挂。

1.1 必看监控

MQ 侧:

  • 积压量(消息堆积数):是否持续上涨、上涨速度(每分钟增加多少)
  • 消费 TPS / 生产 TPS:是否生产明显大于消费
  • 消费延迟:是否快速扩大
  • Broker 磁盘水位:是否接近满盘
  • Broker I/O(IO wait、写入/读取延迟):是否接近瓶颈
  • Broker CPU/网络带宽:是否飙高

下游侧(最容易引发二次事故):

  • DB QPS / 连接数 / 慢 SQL 数 / 锁等待
  • 第三方接口 RT / 超时率 / 熔断情况
  • 业务服务线程池/连接池:是否打满、是否大量超时重试

1.2 快速判定“瓶颈在哪”(三选一)

  • A:队列并行度不足(Queue 少) 特征:Topic Queue 数很少;Consumer 实例多但很多空转;单个实例很忙但整体 TPS 上不去
  • B:下游处理慢(DB/外部接口/业务逻辑慢) 特征:DB 慢 SQL、锁等待、接口超时;消费线程堆积;消费失败/重试多
  • C:Broker 资源吃紧(磁盘/IO/网络) 特征:Broker IO wait 高、磁盘写入延迟高、磁盘水位告急、网络打满

判定结果会决定后续能不能“搬运/扩并行”,不然会救火变纵火。


2)第 2 阶段:先止血(避免继续恶化)

2.1 立即执行的通用止血动作

  • 开启/加强限流与削峰(若入口可控):降低生产 TPS,优先保核心链路
  • 关闭非必要重试风暴:避免失败重试把系统拖死(特别是下游超时导致的级联重试)
  • 隔离非核心 Topic/业务:把资源让给核心 Topic

2.2 分诊后的“止血优先动作”

如果判定是 C:Broker 资源吃紧

  • 优先:扩容 Broker / 扩盘 / 降低写入压力 / 限速消费与生产
  • 禁止:先上“搬运工模式”(读旧写新会让 IO 翻倍,极易把 Broker 打挂)

如果判定是 B:下游处理慢

  • 优先:修下游瓶颈(慢 SQL、索引、连接池、缓存、接口熔断降级)
  • 同时:对消费端限速(宁可慢消化,也不要把 DB 冲垮)
  • 暂缓:盲目扩容 100 个消费者(高概率把 DB 打挂)

如果判定是 A:Queue 并行度不足

  • 可以进入“提速方案”(第 3 阶段),但仍需先做风险校验(顺序/I/O/DB)

3)第 3 阶段:提速(核心处理方案选择)

按“能否立刻提升吞吐”从轻到重排列,优先选风险低的。

3.1 轻量提速(优先级最高,改动小)

适用:下游承载尚可、Broker 不红线、但消费效率偏低

  • 提高单实例并发/批量能力(谨慎)

    • 提升消费线程数(非顺序消费场景才适用)
    • 增大批量拉取/批量消费(减少网络/调度开销)
  • 减少无用开销

    • 关闭不必要日志、降低序列化成本、减少同步阻塞点
  • 短期降级业务逻辑

    • 把“非必须动作”异步化/延后(例如:先落库再慢补、先写队列后聚合处理)

注意:轻量提速受 Queue 上限影响,改善有限,但胜在风险小、见效快。

3.2 中量提速:临时扩并行(仅对“新流量”有效)

适用:希望后续不要继续积压,但对存量帮助有限

  • 扩容 Topic Queue(原 Topic)

    • 作用:提升后续新消息的并行度
    • 限制:对已经积压的存量消息帮助很小(存量仍在旧队列)

该动作属于“止住新增积压”,不是“立刻清掉老积压”。

3.3 重量提速:搬运工模式(对“存量积压”最有效)

适用:存量巨大(千万级/亿级)、Queue 上限限制明显、Broker/下游有余量、且能处理幂等与顺序风险

执行步骤(严格按顺序):

Step 1:风险校验(不通过就暂停)

  • 顺序要求:是否严格顺序?

    • 若严格顺序:必须使用业务键(如订单号)保证同 key 进入同队列;否则禁用搬运
  • 幂等能力:消费侧是否幂等?

    • 若无幂等:搬运重试可能带来重复,需先补幂等或启用更安全的止损策略
  • Broker I/O 余量:读写翻倍是否扛得住?

    • Broker 已红:先扩 Broker/降载,再考虑搬运
  • DB/下游余量:并行消费放大后能否承受?

    • 承受不了:必须限速/分批,不能 100 并发硬冲

Step 2:创建 Topic-Temp(新池子)

  • Queue 设为足够大(如 100),保证后续真正并行
  • 设置合理的保留策略与磁盘容量预估(避免 Topic-Temp 把磁盘撑爆)

Step 3:上线“搬运 Consumer”(不跑业务,只转发)

  • 功能:旧 Topic 拉取 → 写入 Topic-Temp
  • 必须限速:按 Broker/网络/磁盘能力设定速度上限
  • 必须保留关键属性:业务 key、tags、trace 等(便于排障与顺序控制)

Step 4:offset 提交规则(防丢消息底线)

  • 原则:先确认写入 Topic-Temp 成功,再提交旧 Topic offset
  • 影响:失败重试可能导致重复,因此消费端幂等必须成立

Step 5:上线业务 Consumer 消费 Topic-Temp

  • 实例数 ≈ Queue 数(如 100)
  • 初期建议“从小到大”逐步放量(例如 10 → 30 → 60 → 100),每次观察 DB/Broker 指标是否稳定
  • 若下游有压力:必须配合限速、熔断、降级、分批

Step 6:积压下降后收口

  • 当旧 Topic 积压清空/稳定:停止搬运
  • Topic-Temp 消费完:下线临时 Topic 或缩短保留时间释放磁盘
  • 恢复常态消费拓扑

4)方案 B:止损(当“救全量”代价过大)

适用:非核心消息(日志、埋点、非资金通知)或继续处理会拖垮系统

止损选项:

  • Skip Offset:直接跳过老积压,让系统恢复实时水位
  • 导流死信队列(DLQ):把老积压转存,低峰慢补
  • 丢弃策略:对可丢的数据直接丢弃,优先保线上核心链路

止损的目标不是“数据完美”,而是“服务先活下来”。


5)验证标准

  • 积压量开始下降,且下降速度稳定可控
  • 消费延迟回落到业务可接受范围
  • Broker 磁盘水位与 IO 指标回到安全区
  • DB/外部接口 RT 与错误率未因扩并行而恶化
  • 告警从“持续红”变为“可控且逐步消退”

6)事后复盘判断

必须找到“消费为什么慢”,常见根因按概率排序:

  • DB 慢 SQL / 索引缺失 / 锁竞争 / 连接池不足
  • 外部接口超时、重试策略不当导致雪崩
  • 消费逻辑过重、同步链路过长
  • 线程池/队列堆积、背压缺失
  • Topic Queue 设计过小、容量评估缺失
  • 缺少削峰限流、缺少降级开关、缺少压测基线

7)速记版

  1. 积压 = 生产 > 消费
  2. 集群消费下,实例并行度 ≤ Queue 数
  3. 扩 Consumer 超过 Queue 数会空转
  4. 改原 Topic 的 Queue 主要利好新消息,救不了存量
  5. 清存量要么优化/限速,要么搬运到更多 Queue 的新 Topic
  6. 做任何提速前必须确认:顺序、Broker I/O、下游 DB 是否扛得住