RocketMQ 积压大量消息处理方式
这篇文章是对 https://mp.weixin.qq.com/s/ih6hhTPTAgfT0L6MLObmqA 的学习,主要是针对 RocketMQ 积压大量消息的场景,分享一些排查思路和解决方案。
在使用 RocketMQ 承接核心链路流量的系统中,突发流量容易把消费端与下游处理链路推到瓶颈区:生产速度短时间内大于消费速度,Topic 出现大规模积压并触发告警。此类场景的关键不在“盲目扩容消费者”,而在先确认队列模型带来的并行上限与下游承载边界,再选择能真实提升吞吐的处理方式,并把顺序性、Broker I/O、数据库压力等风险纳入救火方案。
1)先理解基础概念
RocketMQ 里最常见的 4 个名词
-
Topic:消息的大类目,类似“一个频道”。例如
order_event、pay_event。 -
Message(消息):一条具体的数据记录,例如“订单创建了”“支付成功了”。
-
Queue(队列):Topic 内部会被切成多个“分片”。可以把 Topic 想成一条大河,Queue 就是多条分流的河道。
- Queue 数越多,理论上并行处理能力上限越高(因为可以多路并行)。
-
Consumer(消费者):真正处理消息的程序/实例,通常部署在多台机器上跑。

关键规则:集群消费模式下的“一夫一妻制”
文章讲的核心铁律是:
同一个 Consumer Group 里,一个 Queue 同一时刻只能分配给一个 Consumer 实例消费。
用更直白的比喻:
- Queue 像“窗口”,Consumer 像“办事员”。
- 一个窗口只能站一个办事员;多来的办事员没有窗口就只能闲着。
这条规则就是为什么“面试官”会嘲笑“盲目加机器”。
2)故事背景
线上突发流量 → 生产者疯狂发消息 → Topic 积压到 1 亿条 → 消费端处理不过来 → 监控告警(延迟、堆积量、失败率上升)。
理解的关键点:
- 积压不是“消息丢了”,而是“消息还在 MQ 里排队没被处理”。
- 队列里排队太多,意味着业务处理有延迟,严重时可能影响主链路。
3)原因(为什么会积压?为什么“加机器”常常没用?)
3.1 积压的本质原因:生产速度 > 消费速度
这句话很抽象,拆开看:
- 生产速度:每秒往 MQ 写多少条消息(QPS)。
- 消费速度:每秒能处理多少条消息(TPS),而且通常要做业务动作:查库、写库、调用第三方等。
一旦生产速度持续高于消费速度,就会像“进门的人比出门的人多”,队伍越来越长。
3.2 为什么“加 Consumer 机器”可能完全无效
假设:Topic 只有 4 个 Queue(很多系统创建 Topic 默认就是少量队列)。
- 现在有 2 台机器跑消费者:每台分到 2 个 Queue,刚好都在干活。
- 扩到 10 台机器:最多只有 4 台能分到 Queue,剩下 6 台没有 Queue 只能围观。 结果:吞吐并不会因为多了 6 台机器而提高。
结论:
Queue 是并行度的“天花板”。Queue 不够,多出来的消费者等于摆设。
3.3 为什么不能直接把原 Topic 的 Queue 改成 100 来救急
直觉上:Queue 少就加 Queue。 但文章强调:这对“已经积压的存量消息”没用。
原因是 RocketMQ 的存储是按 Queue 分开的:
- 积压的 1 亿条消息已经“躺在旧的 4 个 Queue 里”。
- 你把 Queue 改到 100 后,RocketMQ 只会把新来的消息写到新 Queue。
- 那 1 亿条老消息还在旧队列里,仍然只能 4 个消费者并行慢慢啃。 所以“改配置”更多是长期优化,不是即时止血。
4)处理积压(搬运工模式)
核心思想:旧池子太窄,就把水先导到新池子
旧 Topic 的 4 个 Queue 限制死了并行能力。 解决办法不是在旧池子里硬撑,而是把积压先搬到一个“更宽的新池子”。
第一步:写一个“搬运工 Consumer”(不跑业务,只搬运)
搬运工做什么?
- 从旧 Topic 读消息(按原来的 4 个 Queue 读)
- 不做业务处理(不查库、不算逻辑、不调接口)
- 直接把消息写到新 Topic(Topic-Temp)
为什么这样会快?
- 因为业务处理通常才是慢的(DB、网络、锁),而搬运只做 IO,速度接近上限。
- 即使旧 Topic 只有 4 个 Queue,也能把“把消息读出来”这件事做得很快。
阶段性结果:旧 Topic 的积压量快速下降,告警先止血。
第二步:新建 Topic-Temp,把 Queue 开大(比如 100)
新 Topic 的设计目标很明确:
让并行度足够高,让业务处理能被分摊到更多机器上。
Topic-Temp:Queue = 100 这样就能部署 100 个业务消费者实例,每个实例稳定拿到 1 个 Queue 干活。
第三步:让真正的业务 Consumer 去消费 Topic-Temp
这时才跑“真实业务逻辑”:查库、写库、扣库存、发通知等。 因为并行度已经拉开,整体处理速度提升。
这一套的本质
- 旧 Topic:受限于“存量在 4 个 Queue”
- 新 Topic:用更多 Queue 提供更高并行度 这就是文章说的:空间换时间(新增一个 Topic 作为中转空间,换取更快的处理速度)。
5)结果
体现了三个能力:
- 懂 RocketMQ 负载均衡/分配机制:知道 Queue 才是并行度上限。
- 能处理“存量”问题:知道改 Queue 只对增量有效,存量要换通道。
- 能给可落地的 SOP:搬运→扩队列→并行处理,能在生产救火。
6)风险点(容易忽略,但最致命)
风险 1:顺序消息可能被“搬乱”
什么是顺序消息?
- 例如订单事件必须按顺序:创建 → 支付 → 完成 如果乱序,下游可能先收到“支付成功”,再收到“订单创建”,业务直接错乱。
为什么搬运会乱序?
- 搬运通常会多线程/多实例快速读取 + 写入新 Topic,写入时可能打散顺序。
怎么做才相对安全?
- 搬运时保留 Sharding Key(如订单号),保证同一订单的消息仍进入新 Topic 的同一个 Queue,至少做到“同一订单内部不乱”。
记忆:顺序要求越强,搬运越要“按同一业务键归队”。
风险 2:Broker I/O 被打爆
搬运等于让 Broker 同时承受:
- 从旧 Topic 大量读(读磁盘/网络)
- 往新 Topic 大量写(写磁盘/网络)
I/O 直接翻倍,Broker 可能先挂。
对策:
- 先看监控(磁盘利用率、IO wait、CPU、网络)
- Broker 已经红了:先扩容 Broker / 降速搬运 / 先限流再搬运
风险 3:下游数据库被 100 并发冲垮(最常见二次事故)
积压很多时候就是因为 DB 慢、接口慢。 这时如果把业务消费者扩到 100:
- 消费速度确实变快
- 但 100 个并发一起打 MySQL/第三方,相当于对自己发起 DDoS
- 结果:DB 崩了,全站 P0
对策:
- 扩容前先确定瓶颈是否在 DB/外部依赖
- 必要时:限流、降级、批处理、写入队列后异步合并、加缓存、分库分表等
7)方案 B(止损策略:不是所有消息都值得救)
如果积压的是非核心数据(日志、埋点、非资金通知),继续硬处理可能影响新消息进入、拖垮系统。 此时更成熟的做法是“先保主业务”:
- Skip Offset:直接跳过老积压,让系统回到实时水位
- 导入死信队列:把老消息转存,等低峰慢慢补
理解:
先救活病人,再讨论康复训练;先恢复服务,再谈补历史数据。
8)复盘结论(学习落点:止血之后要治本)
积压只是一种症状,真正需要追的是“为什么消费变慢”:
- DB 变慢?(慢 SQL、锁、连接池不足)
- 第三方接口超时?
- 业务代码死锁/线程池打满?
- 消费逻辑幂等/重试导致雪崩?
- 流量突增缺乏限流/削峰?
最终目标是:
- 事故时能用 SOP 快速止血
- 平时能用架构手段避免再次发生(限流、削峰、隔离、容量评估、压测、监控告警)
9) 细节
积压“搬运一定能救火”
搬运只是止血手段之一,前提是“消费逻辑本身没问题,只是并行度不够”。 如果消费逻辑有缺陷(慢 SQL、外部依赖慢等),搬运后依然会积压。
搬运会让 Broker 同时“读旧 + 写新”,额外制造一倍的网络/磁盘压力。 如果 Broker 已经红线(磁盘 IO、CPU、PageCache、网络),搬运可能先把 MQ 打挂——这时搬运不是救火,是添柴。
方案要成立,必须满足的前置条件:
① 消息语义允许“至少一次”且能容忍重复(或已做幂等)
搬运时常见链路是: 旧 Topic 拉取 → 发到新 Topic → 提交旧 Topic 的 offset 只要中间任一步失败重试,就可能出现重复投递(这是 MQ 常态)。 如果下游写库/扣款/状态流转没有幂等,重复会直接造成业务错账。
必须具备之一:
消费侧业务幂等(推荐:按业务唯一键去重,如订单号 + 事件类型 + 版本号)
或搬运侧引入去重/事务保证(复杂且成本高,救火时不一定来得及)
② 消息顺序要求清晰:能否乱序?是“全局顺序”还是“同一业务键顺序”?
RocketMQ 的“顺序消息”一般是同一 Sharding Key(如订单号)在同一队列内有序,而不是全局有序。 搬运会破坏顺序的典型原因是:并发搬运时把同一订单的消息写进了不同队列。
若要求同一订单内严格顺序:
搬运写入 Topic-Temp 时必须用同样的 key 做队列选择(保持同 key 落同队列)
消费 Topic-Temp 时需要按顺序消费模式处理(或至少保证单 key 串行)
③ Broker 与网络 I/O 有余量,且能承受“读写翻倍”
需要在监控里确认:磁盘利用率、IO wait、写入延迟、网络带宽、Broker 堆外内存等。 一旦资源已爆,优先动作应是:限流/降载/扩 Broker/扩盘,而不是搬运。
④ 下游(DB/外部接口)能承受扩出来的并发
如果积压根因是 DB 慢、第三方慢,扩大并发只会把下游打挂。 这时正确动作往往是:
先定位瓶颈(慢 SQL、连接池、锁、接口超时)
对消费进行限速、分批、熔断、降级
或把重操作异步化/落盘再慢慢补
对“搬运工模式”还必须补上的工程细节
-
offset 提交时机 必须做到“发到新 Topic 成功后,再提交旧 Topic 的 offset”。否则可能丢消息。 但这样会带来重复风险,因此幂等是配套前提。
-
限速与背压 搬运程序不能“光速全开”,需要按 Broker/Topic-Temp/下游承载做限流;否则把 MQ 写爆或把 Topic-Temp 撑满磁盘。 Topic-Temp 的保留策略与容量 Topic-Temp 会额外占用磁盘(甚至比原 Topic 更大,因为重复存储一份)。需要评估 retention、磁盘水位、清理策略。
-
消息属性保留 如果原消息依赖 tags、keys、延时等级、重试次数、trace 等属性,搬运时要尽量保留,否则排障与业务行为可能变化。
-
新旧两条流如何收口(搬运期间新消息仍在进入原 Topic)。
- 原 Topic 的“正常消费者”是否继续跑(通常要继续,避免新积压)
- 搬运只处理“老积压”还是全量
- 何时停止 Topic-Temp,如何切回常态
10) 如何排查
0)启动条件与目标
触发条件:
- Topic 积压量持续上升(例如 > 1000 万且仍在增长)
- 消费延迟持续扩大(例如延迟分钟级/小时级)
- 业务告警出现:订单/支付/通知延迟、失败率升高、超时升高
明确目标:
- 先让系统不再继续恶化(止血)
- 再把积压降到可控水位(提速)
- 最后恢复常态并防复发(收口)
1)第 1 阶段:快速分诊
目的:判定“加速是否安全”,避免一上来就把 Broker 或 DB 打挂。
1.1 必看监控
MQ 侧:
- 积压量(消息堆积数):是否持续上涨、上涨速度(每分钟增加多少)
- 消费 TPS / 生产 TPS:是否生产明显大于消费
- 消费延迟:是否快速扩大
- Broker 磁盘水位:是否接近满盘
- Broker I/O(IO wait、写入/读取延迟):是否接近瓶颈
- Broker CPU/网络带宽:是否飙高
下游侧(最容易引发二次事故):
- DB QPS / 连接数 / 慢 SQL 数 / 锁等待
- 第三方接口 RT / 超时率 / 熔断情况
- 业务服务线程池/连接池:是否打满、是否大量超时重试
1.2 快速判定“瓶颈在哪”(三选一)
- A:队列并行度不足(Queue 少) 特征:Topic Queue 数很少;Consumer 实例多但很多空转;单个实例很忙但整体 TPS 上不去
- B:下游处理慢(DB/外部接口/业务逻辑慢) 特征:DB 慢 SQL、锁等待、接口超时;消费线程堆积;消费失败/重试多
- C:Broker 资源吃紧(磁盘/IO/网络) 特征:Broker IO wait 高、磁盘写入延迟高、磁盘水位告急、网络打满
判定结果会决定后续能不能“搬运/扩并行”,不然会救火变纵火。
2)第 2 阶段:先止血(避免继续恶化)
2.1 立即执行的通用止血动作
- 开启/加强限流与削峰(若入口可控):降低生产 TPS,优先保核心链路
- 关闭非必要重试风暴:避免失败重试把系统拖死(特别是下游超时导致的级联重试)
- 隔离非核心 Topic/业务:把资源让给核心 Topic
2.2 分诊后的“止血优先动作”
如果判定是 C:Broker 资源吃紧
- 优先:扩容 Broker / 扩盘 / 降低写入压力 / 限速消费与生产
- 禁止:先上“搬运工模式”(读旧写新会让 IO 翻倍,极易把 Broker 打挂)
如果判定是 B:下游处理慢
- 优先:修下游瓶颈(慢 SQL、索引、连接池、缓存、接口熔断降级)
- 同时:对消费端限速(宁可慢消化,也不要把 DB 冲垮)
- 暂缓:盲目扩容 100 个消费者(高概率把 DB 打挂)
如果判定是 A:Queue 并行度不足
- 可以进入“提速方案”(第 3 阶段),但仍需先做风险校验(顺序/I/O/DB)
3)第 3 阶段:提速(核心处理方案选择)
按“能否立刻提升吞吐”从轻到重排列,优先选风险低的。
3.1 轻量提速(优先级最高,改动小)
适用:下游承载尚可、Broker 不红线、但消费效率偏低
-
提高单实例并发/批量能力(谨慎)
- 提升消费线程数(非顺序消费场景才适用)
- 增大批量拉取/批量消费(减少网络/调度开销)
-
减少无用开销
- 关闭不必要日志、降低序列化成本、减少同步阻塞点
-
短期降级业务逻辑
- 把“非必须动作”异步化/延后(例如:先落库再慢补、先写队列后聚合处理)
注意:轻量提速受 Queue 上限影响,改善有限,但胜在风险小、见效快。
3.2 中量提速:临时扩并行(仅对“新流量”有效)
适用:希望后续不要继续积压,但对存量帮助有限
-
扩容 Topic Queue(原 Topic)
- 作用:提升后续新消息的并行度
- 限制:对已经积压的存量消息帮助很小(存量仍在旧队列)
该动作属于“止住新增积压”,不是“立刻清掉老积压”。
3.3 重量提速:搬运工模式(对“存量积压”最有效)
适用:存量巨大(千万级/亿级)、Queue 上限限制明显、Broker/下游有余量、且能处理幂等与顺序风险
执行步骤(严格按顺序):
Step 1:风险校验(不通过就暂停)
-
顺序要求:是否严格顺序?
- 若严格顺序:必须使用业务键(如订单号)保证同 key 进入同队列;否则禁用搬运
-
幂等能力:消费侧是否幂等?
- 若无幂等:搬运重试可能带来重复,需先补幂等或启用更安全的止损策略
-
Broker I/O 余量:读写翻倍是否扛得住?
- Broker 已红:先扩 Broker/降载,再考虑搬运
-
DB/下游余量:并行消费放大后能否承受?
- 承受不了:必须限速/分批,不能 100 并发硬冲
Step 2:创建 Topic-Temp(新池子)
- Queue 设为足够大(如 100),保证后续真正并行
- 设置合理的保留策略与磁盘容量预估(避免 Topic-Temp 把磁盘撑爆)
Step 3:上线“搬运 Consumer”(不跑业务,只转发)
- 功能:旧 Topic 拉取 → 写入 Topic-Temp
- 必须限速:按 Broker/网络/磁盘能力设定速度上限
- 必须保留关键属性:业务 key、tags、trace 等(便于排障与顺序控制)
Step 4:offset 提交规则(防丢消息底线)
- 原则:先确认写入 Topic-Temp 成功,再提交旧 Topic offset
- 影响:失败重试可能导致重复,因此消费端幂等必须成立
Step 5:上线业务 Consumer 消费 Topic-Temp
- 实例数 ≈ Queue 数(如 100)
- 初期建议“从小到大”逐步放量(例如 10 → 30 → 60 → 100),每次观察 DB/Broker 指标是否稳定
- 若下游有压力:必须配合限速、熔断、降级、分批
Step 6:积压下降后收口
- 当旧 Topic 积压清空/稳定:停止搬运
- Topic-Temp 消费完:下线临时 Topic 或缩短保留时间释放磁盘
- 恢复常态消费拓扑
4)方案 B:止损(当“救全量”代价过大)
适用:非核心消息(日志、埋点、非资金通知)或继续处理会拖垮系统
止损选项:
- Skip Offset:直接跳过老积压,让系统恢复实时水位
- 导流死信队列(DLQ):把老积压转存,低峰慢补
- 丢弃策略:对可丢的数据直接丢弃,优先保线上核心链路
止损的目标不是“数据完美”,而是“服务先活下来”。
5)验证标准
- 积压量开始下降,且下降速度稳定可控
- 消费延迟回落到业务可接受范围
- Broker 磁盘水位与 IO 指标回到安全区
- DB/外部接口 RT 与错误率未因扩并行而恶化
- 告警从“持续红”变为“可控且逐步消退”
6)事后复盘判断
必须找到“消费为什么慢”,常见根因按概率排序:
- DB 慢 SQL / 索引缺失 / 锁竞争 / 连接池不足
- 外部接口超时、重试策略不当导致雪崩
- 消费逻辑过重、同步链路过长
- 线程池/队列堆积、背压缺失
- Topic Queue 设计过小、容量评估缺失
- 缺少削峰限流、缺少降级开关、缺少压测基线
7)速记版
- 积压 = 生产 > 消费
- 集群消费下,实例并行度 ≤ Queue 数
- 扩 Consumer 超过 Queue 数会空转
- 改原 Topic 的 Queue 主要利好新消息,救不了存量
- 清存量要么优化/限速,要么搬运到更多 Queue 的新 Topic
- 做任何提速前必须确认:顺序、Broker I/O、下游 DB 是否扛得住