基于 虚拟线程 构建高性能 Kafka → HTTP 转发服务
在物联网/车联网场景中,如果需要将设备数据转发到第三方平台,为了不影响IOT平台性能,通常将设备数据通过 Kafka 进行高速传输,但很多第三方平台只提供 HTTP 接口。如何在两者之间搭建一座"高性能、不丢消息"的桥梁?本文将分享我基于 Spring Boot 3.3 + 虚拟线程 + PostgreSQL 实现的项目。
ps: 本文的目的是记录一个项目设计过程,便于日后复盘,但暂不提供实际压测指标(如吞吐量、延迟、资源消耗)
一、业务背景
1.1 场景描述
这个项目源于一个真实的智慧停车业务场景。路边停车的摄像头设备会实时上报两类数据:
- 车辆出入事件:每当一辆车驶入或驶出泊位,摄像头会抓拍一组图片(全景图、车牌图、特写图等),并生成一条包含车牌号、出入时间、可信度等字段的 JSON 消息
- 心跳抓拍图:摄像头每 5 分钟自动拍摄一张泊位照片,用于确认设备在线状态和泊位实时状态
这些消息由设备端统一推送到 Kafka 集群,需要从中消费消息,转换字段格式后,实时转发给第三方停车管理平台。第三方平台只提供 HTTP REST 接口,不支持 Kafka 直连。
请注意: 后续文章中写的 JVM 崩溃 只是大致的说辞,实际可能是 程序意外、服务器意外断电等等。这里将"JVM 崩溃"定义为"服务异常"。
1.2 核心挑战
听起来像是一个简单的"消费 Kafka → 发 HTTP"需求,但深入分析后会发现几个关键挑战:
挑战一:消息绝对不能丢。 每一条车辆出入事件都对应一次真实的停车计费。如果丢了"驶入"事件,用户可能免费停车;如果丢了"驶出"事件,用户可能被永久计费。这不仅仅是技术问题,直接关系到收费准确性。
挑战二:字段映射不是简单的转发。 Kafka 中的消息结构和第三方平台要求的格式差异很大。比如出入类型的枚举值不同(Kafka 用 0/1 表示出/进,平台用 1/2 表示驶入/驶出),可信度字段可能一个是小数一个是整数,图片路径格式不一致,图片类型的枚举映射也存在一对多关系。
挑战三:并发压力不可预测。 高峰期(早晚通勤)可能有大量车辆集中进出,QPS 瞬间飙升;而大部分时间消息量很低。系统既要在高峰期扛住压力,又不能为低峰期浪费过多资源。
挑战四:第三方平台可能不稳定。 HTTP 接口可能超时、返回 5xx、或者返回业务错误码。我们不能因为对方服务不可用就丢掉消息,必须有可靠的重试机制。
1.3 整体架构
综合以上挑战,设计了以下架构:
Kafka Broker
│ batch poll (max 500 records)
▼
DeviceDataKafkaListener (3 并发分区线程)
│ 每条消息先 INSERT 到 PostgreSQL
▼
ForwardDispatcher
│ tryAcquire(Semaphore) ◄─── 背压阀门 (max-concurrency)
▼
VirtualThreadExecutor (Project Loom)
│
▼
HttpForwardService
│ JDK HttpClient (HTTP/2, 非阻塞)
│
├─► 2xx → SUCCESS
├─► 非2xx/超时 → PENDING, 计算指数退避
└─► 超过 maxAttempts → FAILED
▲
RetryScheduler ────────────────────┘
每 5s 扫描 PENDING & next_retry_at <= NOW()
SELECT … FOR UPDATE SKIP LOCKED (多实例安全)这里因该已经注意到一个不太常规的设计:每条消息在 HTTP 转发之前,先写入 PostgreSQL。这么设计的思考过程,在后续详细展开。
二、技术栈选型
在开始写代码之前,我做了一番技术选型权衡:
| 组件 | 选型 | 选型理由 |
|---|---|---|
| JDK | 26 | 虚拟线程 (Project Loom),极低内存并发 |
| 框架 | Spring Boot 3.3 | 成熟生态,自动装配 |
| Kafka 客户端 | Spring Kafka(批量监听) | 批量 poll 提升吞吐 |
| HTTP 客户端 | JDK HttpClient | 内置 HTTP/2,零额外依赖 |
| 并发模型 | 虚拟线程 + Semaphore | 天然背压,无需手动管理线程池 |
| 数据库 | PostgreSQL + Spring Data JPA | 持久化重试,SKIP LOCKED 分布式安全 |
| ID 生成 | 雪花算法 (Snowflake) | 趋势递增,B+Tree 友好 |
| 监控 | Micrometer + Prometheus | 标准可观测性 |
为什么选虚拟线程而不是 Reactor/WebFlux?
响应式编程(WebFlux + Netty)确实能以极少的线程处理大量并发,但它带来了两个问题:一是调试困难,堆栈信息难以追踪;二是整个调用链都必须是响应式的,一旦中间有阻塞操作就会前功尽弃。
而 虚拟线程 提供了"代码写起来像同步阻塞,运行起来像异步非阻塞"的体验。每个 HTTP 转发任务就是一个普通的虚拟线程,代码直观、易维护,同时内存开销极低。500 个并发虚拟线程只消耗约 2MB,而同样数量的平台线程需要约 100MB 栈内存(ps: 这里的数值是大致的值,只是为了有个简单的比较)。
为什么选 JDK HttpClient 而不是 OkHttp/Apache HttpClient?
项目只需要做简单的 POST 请求转发,不需要连接池管理、拦截器、重试策略等高级功能。JDK 11+ 内置的 HttpClient 已经支持 HTTP/2、异步请求、超时控制,完全够用,还能减少一个外部依赖。
为什么用雪花 ID 而不是数据库自增或 UUID?
数据库自增 ID 在分布式多实例部署时会冲突;UUID 是随机值,作为 PostgreSQL 主键会导致 B+Tree 索引频繁分裂,写入性能差。雪花 ID 既是全局唯一的,又是趋势递增的,对索引友好。而且从 ID 本身就能反推出大致的生成时间,排查问题非常方便。
三、核心设计:为什么先写 DB 再异步 HTTP?
3.1 三种方案的对比
面对"消费 Kafka → 发 HTTP"这个需求,常见有三种做法:
方案 A:纯内存转发(最快但不安全)
Kafka → 内存队列 → HTTP 转发吞吐量最高,但 JVM 崩溃或重启时,内存队列中的消息全部丢失。对于停车计费场景,这是不可接受的。
方案 B:先转发再写 DB(看似合理但有隐患)
Kafka → HTTP 转发 → 写 DB 记录结果HTTP 转发成功但写 DB 失败时,这条消息既没有 DB 记录,下次也不会被重新消费(Kafka offset 已提交),等于消息丢了。
方案 C:先写 DB 再转发(本项目采用)
Kafka → 写 DB → 提交 offset → 异步 HTTP 转发3.2 为什么方案 C 最安全
优点
核心逻辑是:先持久化,后处理。
- Kafka 消息到达后,立即 INSERT 到 PostgreSQL(状态为 PENDING)
- 所有消息落库完成后,手动提交 Kafka offset
- 然后才把消息提交到虚拟线程进行 HTTP 转发
这样一来,无论在哪个环节出问题,消息都不会丢:
- 落库后、提交 offset 前 JVM 崩溃:offset 没有提交,重启后会重新投递这些消息。而
messageKey唯一索引保证重复消费不会产生重复记录 - 提交 offset 后、HTTP 转发前 JVM 崩溃:DB 中已有 PENDING 记录,
RetryScheduler重启后会自动扫出这些记录继续投递 - HTTP 转发失败:记录状态被设回 PENDING 并计算下次重试时间,等待
RetryScheduler在合适的时机重新投递
缺点:
这个方案虽然安全,但其本质是将原本由 Kafka 承载的高并发压力转移到了数据库上,所以尽可能避免虚拟线程发起http时持有数据库连接。
四、关键代码实现
4.1 启动类 —— 开启虚拟线程
@Slf4j
@SpringBootApplication
@EnableScheduling
public class KafkaForwarderApplication {
public static void main(String[] args) {
System.setProperty("spring.threads.virtual.enabled", "true");
SpringApplication.run(KafkaForwarderApplication.class, args);
log.info("Kafka Forwarder 启动成功。");
}
}关键点是这行 System.setProperty("spring.threads.virtual.enabled", "true")。它让 Spring Boot 全局启用虚拟线程支持——Tomcat 请求处理线程、@Scheduled 调度线程、@Async 异步任务等,都会自动使用虚拟线程。这比手动创建虚拟线程执行器更彻底。
@EnableScheduling 是为了启用 RetryScheduler 的定时任务能力,每 5 秒扫描一次需要重试的记录。
4.2 核心 Bean 配置 —— 虚拟线程 + 背压阀门
@Configuration
@EnableConfigurationProperties(ForwarderProperties.class)
public class AppConfig {
@Bean(destroyMethod = "close")
public ExecutorService virtualThreadExecutor() {
return Executors.newVirtualThreadPerTaskExecutor();
}
@Bean
public Semaphore httpConcurrencySemaphore(ForwarderProperties props) {
int permits = props.getExecutor().getMaxConcurrency();
return new Semaphore(permits, true);
}
@Bean
public HttpClient httpClient(ForwarderProperties props) {
ForwarderProperties.Http httpCfg = props.getHttp();
return HttpClient.newBuilder()
.version(HttpClient.Version.HTTP_2)
.connectTimeout(Duration.ofSeconds(httpCfg.getConnectTimeoutSeconds()))
.executor(Executors.newVirtualThreadPerTaskExecutor())
.followRedirects(HttpClient.Redirect.NORMAL)
.build();
}
}为什么要用 Semaphore 做背压?
虚拟线程的创建成本极低,理论上可以无限制地创建。如果不加限制,在 Kafka 消费速度远超 HTTP 吞吐速度时(比如第三方平台突然变慢),会瞬间创建数万个虚拟线程同时等待 HTTP 响应。虽然虚拟线程本身不占多少内存,但每个并发请求都会占用一个数据库连接和一个 TCP 连接,最终可能导致数据库连接池耗尽或 OOM。
Semaphore 的作用就是一道"阀门":当 in-flight 的 HTTP 请求达到上限(默认 100)时,新消息不再投递到虚拟线程,而是保持在 DB 中的 PENDING 状态,等待 RetryScheduler 后续处理。这样既保护了系统,又不会丢消息。
注意 new Semaphore(permits, true) 中的 true 参数表示公平模式,确保等待时间最长的任务优先获得许可,防止某些消息被无限期饿死。
为什么 HttpClient 也要指定虚拟线程执行器?
JDK HttpClient 的异步回调(如响应处理)默认使用公共 ForkJoinPool。显式指定虚拟线程执行器后,HTTP 响应回调也在虚拟线程上执行,避免阻塞 ForkJoinPool 中的平台线程。
4.3 Kafka 消费配置 —— 手动提交 + 批量模式
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory(
ConsumerFactory<String, String> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setConsumerFactory(consumerFactory);
// 并发线程数 = 分区数,这里的listenerConcurrency=3
factory.setConcurrency(listenerConcurrency);
// 启用批量模式,一次 onMessage 调用处理多条消息
factory.setBatchListener(true);
// 手动立即提交 offset
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
// 容器级错误处理:间隔 1s 重试 3 次,之后记录日志并跳过(数据已在DB中)
factory.setCommonErrorHandler(new DefaultErrorHandler(new FixedBackOff(1000L, 3L)));
return factory;
}这里有几个关键设计:
为什么用 MANUAL_IMMEDIATE 而不是自动提交?
Kafka 默认的自动提交策略是定期提交 offset,不关心消息是否真正处理成功。如果在自动提交之后、HTTP 转发之前 JVM 崩溃,这些消息就永远不会被重新消费。使用 MANUAL_IMMEDIATE 精确控制提交时机:只有当整批消息全部写入 DB 后,才调用 ack.acknowledge() 提交 offset。这就是 at-least-once 语义的保证。
为什么用批量模式(setBatchListener(true))?
单条消费模式下,每收到一条消息就触发一次 Listener 方法调用,增加 Spring 事务管理、方法栈等开销。批量模式下,一次 poll 拿到的 500 条消息作为一个 List 传入,我们可以在方法内循环处理,减少框架层面的重复开销。
ps: 这里的500是再配置文件中 max-poll-records 配置的batch fetch size。
并发数设计
这和 Kafka Topic 的分区数有关。Spring Kafka 的 concurrency 参数决定了创建多少个 KafkaMessageListenerContainer,每个容器负责消费一个分区。如果并发数大于分区数,多余的容器会空闲;如果小于分区数,部分容器需要消费多个分区。一般建议并发数等于分区数。
错误处理器为什么只重试 3 次就跳过?
DefaultErrorHandler(new FixedBackOff(1000L, 3L)) 表示如果 Listener 方法本身抛出异常(比如 JSON 解析失败),间隔 1 秒重试 3 次。如果仍然失败就跳过这条消息,记录错误日志。之所以不无限重试,是因为 Listener 层面的异常通常是数据格式问题(比如消息体不是合法 JSON),重试多少次都不会成功。而且消息已经落库了,后续可以通过人工处理 FAILED 记录来补救。
4.4 Kafka Listener —— 幂等入库 + 异步投递
@KafkaListener(
topics = "${forwarder.kafka.vehicle-event-topic}",
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "kafkaListenerContainerFactory"
)
public void onVehicleEvent(
List<ConsumerRecord<String, String>> records,
Acknowledgment ack) {
processBatch(records, ack, ApiType.VEHICLE_EVENT);
}
private void processBatch(List<ConsumerRecord<String, String>> records,
Acknowledgment ack, ApiType apiType) {
for (ConsumerRecord<String, String> cr : records) {
String messageId = extractMessageId(cr);
ForwardRecord record = ForwardRecord.builder()
.messageKey(ForwardRecord.buildMessageKey(messageId))
.apiType(apiType)
.payload(cr.value())
.status(ForwardStatus.PENDING)
.build();
dispatcher.dispatchAsync(record);
}
ack.acknowledge();
}为什么用 messageId 而不是 Kafka offset 做幂等键?
Kafka offset 是相对于 Topic-Partition 的,看起来很适合做唯一标识。但有一个问题:如果 Kafka 管理员因为某种原因重置了 consumer group 的 offset(earliest 或 latest),大量"旧"消息会被重新消费。此时 offset 虽然相同,但消息可能已经被成功转发过了。
而我选择从消息 JSON 中提取 messageId 字段作为幂等键。这个 ID 是上游IOT平台生成的,天然全局唯一,不受 Kafka offset 变化影响。配合数据库的 message_key 唯一索引,即使同一条消息被消费 100 次,也只会入库一次。
如果 messageId 缺失怎么办?
我在代码中做了降级处理:如果消息 JSON 中没有 messageId 字段,就用 topic + partition + offset 的组合作为备选键,同时打印一条 WARN 日志提醒运维人员排查上游数据问题。这样既不会因为上游数据质量问题而丢失消息,又能及时发现问题。
为什么 ack.acknowledge() 放在循环外面?
这是 at-least-once 语义的关键。所有消息全部处理(落库)完成后,才一次性提交 offset。如果放在循环内部每条消息处理后就提交,那么第 300 条消息处理完提交后,如果第 301 条消息处理时 JVM 崩溃,前 300 条的 offset 已提交、但第 301~500 条永远不会被重新消费。
4.5 ForwardDispatcher —— 幂等 + 背压 + 虚拟线程投递
这是整个系统的调度中枢,也是最复杂的部分。
踩坑:主键冲突(PSQLException: duplicate key)
最初的实现中,dispatchAsync 方法加了 @Transactional,内部先 INSERT 再异步投递。但问题在于:异步投递是在虚拟线程中执行的,虚拟线程中的 httpForwardService.forward() 会执行另一次 repository.save() 来更新记录状态。如果外层事务还没提交,虚拟线程中的 save 会尝试用同一个 ID 再次 INSERT(因为事务隔离,虚拟线程看不到外层未提交的 INSERT),导致主键冲突。
解决方案:将"落库"和"投递"彻底拆分
public boolean dispatchAsync(ForwardRecord record) {
String messageKey = record.getMessageKey();
// 幂等检查
Optional<ForwardRecord> existing = repository.findByMessageKey(messageKey);
if (existing.isPresent()) {
return handleExistingRecord(existing.get(), messageKey);
}
// 独立事务 INSERT,方法返回时数据已提交
record.setId(snowflakeIdGenerator.nextId());
ForwardRecord saved = saveNewRecord(record);
return submitToVirtualThread(saved, "首次投递");
}
@Transactional(propagation = Propagation.REQUIRES_NEW)
public ForwardRecord saveNewRecord(ForwardRecord record) {
return repository.save(record);
}saveNewRecord 使用 Propagation.REQUIRES_NEW 独立事务,确保方法返回时数据已经 COMMIT。这样虚拟线程中的后续操作(UPDATE)看到的是已提交的数据,走的是 UPDATE 逻辑而不是 INSERT,彻底避免了主键冲突。
幂等处理:四种状态分别怎么处理
private boolean handleExistingRecord(ForwardRecord record, String messageKey) {
return switch (record.getStatus()) {
case SUCCESS -> {
log.info("[id={}] 已成功,跳过重复消费", record.getId());
yield true;
}
case FAILED -> {
log.warn("[id={}] 已 FAILED,跳过(需人工处理)", record.getId());
yield true;
}
case PENDING, IN_PROGRESS -> {
yield submitToVirtualThread(record, "重复消费-已有记录");
}
};
}这里的判断逻辑是:
- SUCCESS:说明之前已经成功转发过了,直接跳过。这是最理想的幂等场景——Kafka 重复投递了一条已经成功的消息
- FAILED:已经耗尽了所有重试次数,不能再自动重试了,需要人工介入。跳过是为了避免无限循环
- PENDING / IN_PROGRESS:说明之前尝试过但还没成功(可能上次 JVM 在转发过程中崩溃了),直接重新投递。注意这里不会重新 INSERT,只是把已有的记录重新提交到虚拟线程
Semaphore 背压的实现
private boolean submitToVirtualThread(ForwardRecord record, String label) {
boolean acquired = httpConcurrencySemaphore
.tryAcquire(timeoutMs, TimeUnit.MILLISECONDS);
if (!acquired) {
// Semaphore 满了,记录保持 PENDING 状态
// RetryScheduler 会在下次扫描时重新尝试投递
return false;
}
virtualThreadExecutor.submit(() -> {
try {
httpForwardService.forward(record);
} finally {
httpConcurrencySemaphore.release();
}
});
return true;
}注意 tryAcquire 有超时时间(默认 5 秒)。这意味着消费线程在获取不到许可时会等待 5 秒,而不是立即放弃。如果 5 秒内有其他 HTTP 请求完成释放了许可,当前消息就可以立即投递。只有等待 5 秒后仍然没有许可,才会放弃并返回 false。
返回 false 后,消息在 DB 中的状态是 PENDING,RetryScheduler 会在下一个周期(5 秒后)重新扫描到这条记录并尝试投递。这就是"背压不丢消息"的实现机制。
4.6 HttpForwardService —— 单次 HTTP 转发 + 指数退避
这个类负责一次完整的 HTTP 转发尝试。使用 Propagation.REQUIRES_NEW 独立事务,确保每次转发结果都被及时提交,不受外层事务影响。
@Transactional(propagation = Propagation.REQUIRES_NEW)
public void forward(ForwardRecord record) {
ForwarderProperties.Retry retryCfg = props.getRetry();
// 1. 标记为处理中,增加尝试次数
record.setStatus(ForwardStatus.IN_PROGRESS);
record.setAttemptCount(record.getAttemptCount() + 1);
repository.save(record);
// 2. 构建请求(包含字段映射转换)
HttpRequest httpRequest = requestBuilder.build(record);
// 3. 发送 HTTP 请求并计时
long startMs = System.currentTimeMillis();
HttpResponse<String> response = httpClient.send(
httpRequest,
HttpResponse.BodyHandlers.ofString(StandardCharsets.UTF_8));
long durationMs = System.currentTimeMillis() - startMs;
// 4. 根据响应更新状态
// ...
}为什么要先标记 IN_PROGRESS 再发请求?
如果直接发 HTTP 请求,在请求过程中 JVM 崩溃了,DB 中的记录状态仍然是 PENDING。RetryScheduler 重新投递时,会从头开始一次新的请求。但如果对方平台收到了请求并成功处理,这就变成了重复推送。
而先标记 IN_PROGRESS,即使 JVM 在 HTTP 请求过程中崩溃,RetryScheduler 看到的是一条 IN_PROGRESS 记录(而非 PENDING),可以据此判断:这条记录可能正在被另一个实例处理,或者上次处理中断了,需要更谨慎地处理。
HTTP 响应不是简单的 2xx 就成功
处理的不是简单的 HTTP 状态码判断。第三方平台的响应格式是:
// 成功
{ "result": 0, "data": {} }
// 业务失败
{ "result": 1, "errcode": "999", "errmsg": "上级系统发生错误" }HTTP 200 但 result != 0 也是失败。需要进一步判断 errcode:
if (resp.isSuccess()) {
record.setStatus(ForwardStatus.SUCCESS);
} else if ("4".equals(resp.getErrcode())) {
// errcode=4 是"重复上报",平台已忽略,视为成功
record.setStatus(ForwardStatus.SUCCESS);
} else if (!resp.isRetryable()) {
// errcode=998 明确说"无需重发",直接标记失败
markFailed(record, 200, errMsg);
} else if (record.getAttemptCount() >= retryCfg.getMaxAttempts()) {
// 可重试但超过最大次数
markFailed(record, 200, errMsg);
} else {
// 可重试,计算下次重试时间
scheduleRetry(record, retryCfg);
}errcode=4(重复上报)这个处理很有必要:因为本项目的 at-least-once 语义可能导致同一条消息被转发多次,平台返回 errcode=4 说明它已经处理过了,因此可以安全地标记为成功。
指数退避策略
private long computeDelay(int attemptCount, ForwarderProperties.Retry cfg) {
double delay = cfg.getInitialIntervalSeconds()
* Math.pow(cfg.getMultiplier(), attemptCount - 1);
return Math.min((long) delay, cfg.getMaxIntervalSeconds());
}以默认配置为例(初始 5 秒,因子 2.0,上限 300 秒,最多 5 次):
| 失败次数 | 等待时间 | 说明 |
|---|---|---|
| 第 1 次 | 5s | 5 × 2^0 |
| 第 2 次 | 10s | 5 × 2^1 |
| 第 3 次 | 20s | 5 × 2^2 |
| 第 4 次 | 40s | 5 × 2^3 |
| 第 5 次 | FAILED | 超过 maxAttempts,标记终态 |
为什么用指数退避而不是固定间隔?如果第三方平台短暂故障,大量消息同时以固定间隔重试,会产生"重试风暴",进一步加剧对方压力。指数退避让重试间隔逐渐增大,给对方恢复的时间。
4.7 RetryScheduler —— 数据库持久化重试
这是保证消息不丢的最后一道防线。
@Scheduled(fixedDelayString = "${forwarder.retry.scheduler-fixed-delay-ms:5000}")
@Transactional
public void processPendingRetries() {
List<ForwardRecord> candidates = repository.findPendingForRetry(
ForwardStatus.PENDING,
Instant.now(),
PageRequest.of(0, batchSize));
for (ForwardRecord record : candidates) {
dispatcher.dispatchRetry(record);
}
}为什么不使用内存队列(如 BlockingQueue)做重试?
内存队列的问题是:JVM 崩溃时队列中的数据全部丢失。假设有 100 条消息因为第三方平台故障正在等待重试,此时 JVM 重启了,这 100 条消息就永远丢了。
而我的方案是将重试记录持久化在 PostgreSQL 中。next_retry_at 字段记录了下次应该重试的时间,RetryScheduler 每 5 秒扫一次数据库,把到期的记录捞出来重新投递。JVM 重启后,调度器自动恢复,未完成的记录会被继续处理。
多实例安全:SKIP LOCKED 详解
@Lock(LockModeType.PESSIMISTIC_WRITE)
@Query("""
SELECT r FROM ForwardRecord r
WHERE r.status = :status
AND (r.nextRetryAt IS NULL OR r.nextRetryAt <= :now)
ORDER BY r.nextRetryAt ASC NULLS FIRST, r.id ASC
""")
List<ForwardRecord> findPendingForRetry(
@Param("status") ForwardStatus status,
@Param("now") Instant now,
Pageable pageable);生产环境通常会部署多个实例做高可用。如果两个实例同时扫表,可能捞到相同的记录并重复发送 HTTP 请求。为了解决这个问题,在查询中使用了两个关键机制。
@Lock(LockModeType.PESSIMISTIC_WRITE) 的作用
这是 JPA 的悲观写锁注解,Spring Data JPA 会自动在生成的 SQL 末尾追加 FOR UPDATE。生成的实际 SQL 等价于:
SELECT * FROM forward_records
WHERE status = 'PENDING'
AND (next_retry_at IS NULL OR next_retry_at <= NOW())
ORDER BY next_retry_at ASC NULLS FIRST, id ASC
LIMIT 100
FOR UPDATEFOR UPDATE 的含义是:查到的每一行都会被加上行级排他锁。在当前事务提交或回滚之前,其他事务不能修改、删除这些行,也不能用 FOR UPDATE 锁定它们。
但这里有一个问题:如果两个实例几乎同时执行这条 SQL,实例 B 的 FOR UPDATE 会阻塞等待实例 A 释放锁。这虽然保证了正确性,但让实例 B 白白等待,浪费了调度周期。
SKIP LOCKED 的作用
Spring Data JPA 默认不会加 SKIP LOCKED,需要通过 Hibernate 的 hint 或自定义原生查询来启用。在本项目中,通过在 application.yml 中配置 Hibernate 方言来支持 SKIP LOCKED:
spring:
jpa:
properties:
hibernate:
dialect: org.hibernate.dialect.PostgreSQLDialect当 Hibernate 检测到 @Lock(PESSIMISTIC_WRITE) 配合 PostgreSQL 方言时,会在 SQL 中追加 SKIP LOCKED:
SELECT * FROM forward_records
WHERE status = 'PENDING'
AND (next_retry_at IS NULL OR next_retry_at <= NOW())
ORDER BY next_retry_at ASC NULLS FIRST, id ASC
LIMIT 100
FOR UPDATE SKIP LOCKEDSKIP LOCKED 让查询跳过已被其他事务锁定的行,而不是等待。这样实例 B 不会阻塞,而是拿到未被锁定的记录。
完整的多实例并发执行过程
假设数据库中有 6 条 PENDING 记录(ID: 1~6),两个实例几乎同时触发 RetryScheduler,每次最多取 3 条:
时间线:
T0: DB 中有 6 条 PENDING 记录:[记录1, 记录2, 记录3, 记录4, 记录5, 记录6]
T1: 实例 A 开启事务 Tx-A,执行:
SELECT ... FOR UPDATE SKIP LOCKED LIMIT 3
→ 锁定 记录1, 记录2, 记录3
→ 返回给实例 A
T2: 实例 B 开启事务 Tx-B,执行:
SELECT ... FOR UPDATE SKIP LOCKED LIMIT 3
→ 记录1 已被 Tx-A 锁定,SKIP(跳过)
→ 记录2 已被 Tx-A 锁定,SKIP(跳过)
→ 记录3 已被 Tx-A 锁定,SKIP(跳过)
→ 记录4 未锁定,锁定并返回
→ 记录5 未锁定,锁定并返回
→ 记录6 未锁定,锁定并返回
→ 返回给实例 B:[记录4, 记录5, 记录6]
T3: 实例 A 处理 [记录1, 记录2, 记录3],分别发起 HTTP 转发
实例 B 处理 [记录4, 记录5, 记录6],分别发起 HTTP 转发
→ 两个实例并行工作,互不干扰,每条记录只被处理一次
T4: 实例 A 转发完成,COMMIT Tx-A → 记录1/2/3 的锁释放
实例 B 转发完成,COMMIT Tx-B → 记录4/5/6 的锁释放如果实例 A 在 T3 崩溃了会怎样?
T3': 实例 A 崩溃,Tx-A 自动 ROLLBACK
→ 记录1/2/3 的锁释放,状态仍然是 PENDING(IN_PROGRESS 的 UPDATE 被 ROLLBACK)
T5: 下一次 RetryScheduler 扫描周期(5秒后)
→ 记录1/2/3 再次被捞出来,重新投递
→ 消息不丢为什么不直接用普通的 FOR UPDATE?
对比一下两种方案在多实例场景下的表现:
| 场景 | FOR UPDATE(无 SKIP LOCKED) | FOR UPDATE SKIP LOCKED |
|---|---|---|
| 实例 A 和 B 同时扫表 | B 阻塞等待 A 释放锁 | B 跳过 A 锁定的行,立即拿到可用记录 |
| 实例 A 处理慢 | B 一直等待,调度周期浪费 | B 拿到自己的记录,并行处理 |
| 实例 A 崩溃 | B 最终等到锁,正常处理 | 记录在 A 回滚后被下次扫描捞到 |
| 结果 | 串行处理,吞吐受限 | 并行处理,吞吐翻倍 |
为什么不用 Redis 分布式锁?
很多人在多实例场景下第一反应是引入 Redis 分布式锁。但这会带来额外复杂度:需要维护 Redis 集群、处理锁超时、处理锁续期、处理 Redis 宕机后的降级策略……而 SKIP LOCKED 直接利用数据库本身的能力,零额外组件,代码无需感知多实例的存在。
简单来说:如果并发控制粒度是数据库行级别,且已经用了 PostgreSQL,SKIP LOCKED 几乎总是比 Redis 分布式锁更好的选择。
4.9 消息字段映射(MessageMapper)
这部分代码量大但逻辑直接——就是做字段转换。挑几个有代表性的转换讲一下设计考量。
出入类型转换(反向映射)
Kafka 中 inoutType:0 = 出场,1 = 进场。第三方平台 enter_state:1 = 驶入,2 = 驶出。注意这两个枚举不是简单的 1:1 映射,而是反过来的:Kafka 的"出"(0) 对应平台的"驶出"(2),Kafka 的"进"(1) 对应平台的"驶入"(1)。这种不一致在对接第三方系统时非常常见,如果搞反了,车辆进出记录就会全部颠倒。
private String convertInoutType(Integer inoutType, long itemId) {
return switch (inoutType) {
case 0 -> "2"; // 出场 → 驶出
case 1 -> "1"; // 进场 → 驶入
default -> "1"; // 未知默认驶入
};
}可信度转换(小数 → 百分比整数)
Kafka 中的可信度是 01 的小数(如 0.854),平台要求 0100 的整数字符串(如 “85”)。注意这里用的是 Math.round 而不是强转 (int),因为 0.995 × 100 = 99.5,round 会得到 100,强转会得到 99。
private String convertCredible(Double credible, String fieldName, long itemId) {
if (credible == null) return "0";
int val = (int) Math.round(credible * 100);
return String.valueOf(Math.max(0, Math.min(100, val)));
}五、状态机设计
每条消息在 forward_records 表中的生命周期,本质上是一个有限状态机:
PENDING ──► IN_PROGRESS ──► SUCCESS (终态 ✓)
│
├──► PENDING (失败,等待重试)
│
└──► FAILED (终态 ✗,超过最大重试次数)5.1 为什么需要 IN_PROGRESS 状态?
初看之下,PENDING 和 SUCCESS/FAILED 三种状态就够了。加 IN_PROGRESS 是为了解决一个具体问题:区分"还没开始处理"和"正在处理中"。
想象一个场景:RetryScheduler 扫到了一条 PENDING 记录,正准备投递到虚拟线程。但就在同一瞬间,另一个 RetryScheduler 实例(多实例部署)也扫到了同一条记录。如果没有 IN_PROGRESS 状态,两个实例可能同时投递同一条消息。
有了 IN_PROGRESS 后,查询条件是 WHERE status = PENDING,不包括 IN_PROGRESS。一旦记录开始被处理,状态变为 IN_PROGRESS,其他调度器就不会再捞到它。
当然,在本项目的实现中,SKIP LOCKED 已经在数据库层面解决了并发问题。IN_PROGRESS 状态更像是一个应用层的"防重保险",双重保护。
5.2 为什么重试是原地更新而不是新建记录?
每次重试是对同一条记录的 UPDATE(attempt_count +1,next_retry_at 后移),而不是 INSERT 一条新记录。这样设计有三个好处:
好处一:幂等性天然保证。 因为整条消息的生命周期只有一条记录,messageKey 唯一索引只需要防住"首次入库"的重复即可。如果每次重试都新建记录,幂等逻辑会变得复杂得多。
好处二:状态追踪简单。 查一条记录就能看到当前是第几次尝试、上次失败原因是什么、下次什么时候重试。不需要关联查询多条记录再排序。
好处三:数据库压力更小。 一条消息从消费到成功,主表只产生 1 行记录(多次 UPDATE),而不是 N 行记录(多次 INSERT)。
5.3 双表设计:主表 + 重试日志表
虽然主表只保留最新状态,但每次尝试的详细过程被记录在 forward_retry_logs 表中:
forward_records (1) ────── (*) forward_retry_logs
id (雪花ID) ←──────── record_id为什么不用一张大表记录所有信息?因为主表需要被高频查询和更新(消费、转发、重试调度都在操作它),保持字段精简可以提升查询性能。而重试日志表是追加写入的,只在排查问题时才查询(“这条消息为什么失败了?第几次失败的?当时返回了什么错误码?"),不需要高频访问。
两表分离是"热点数据"和"冷数据"的典型分离模式。
六、监控指标
6.1 指标注册实现
监控系统基于 Micrometer,通过 MetricsService 在应用启动时注册自定义 Gauge 指标。Gauge 是一种"实时采样"指标——每次 Prometheus 来抓取数据时,Gauge 会调用提供的 lambda 表达式获取最新值。
@Slf4j
@Service
@RequiredArgsConstructor
public class MetricsService {
private final MeterRegistry meterRegistry;
private final ForwardRecordRepository repository;
private final Semaphore httpConcurrencySemaphore;
@PostConstruct
public void registerGauges() {
// 待转发记录数(含等待重试)
Gauge.builder("forwarder.records.pending",
() -> (double) repository.countByStatus(ForwardStatus.PENDING))
.description("待转发记录数(含等待重试)")
.register(meterRegistry);
// 当前 in-flight 的 HTTP 请求数
Gauge.builder("forwarder.records.in_progress",
() -> (double) repository.countByStatus(ForwardStatus.IN_PROGRESS))
.description("当前 HTTP 请求 in-flight 记录数")
.register(meterRegistry);
// 累计转发成功数
Gauge.builder("forwarder.records.success",
() -> (double) repository.countByStatus(ForwardStatus.SUCCESS))
.description("累计转发成功记录数")
.register(meterRegistry);
// 已耗尽重试次数的失败数(需人工干预)
Gauge.builder("forwarder.records.failed",
() -> (double) repository.countByStatus(ForwardStatus.FAILED))
.description("已耗尽重试次数的失败记录数(需人工干预)")
.register(meterRegistry);
// Semaphore 剩余许可数
Gauge.builder("forwarder.semaphore.available_permits",
() -> (double) httpConcurrencySemaphore.availablePermits())
.description("HTTP 并发 Semaphore 剩余许可数(为 0 表示满载)")
.register(meterRegistry);
log.info("Forwarder 自定义 Micrometer 指标注册完成");
}
}为什么用 Gauge 而不是 Counter?
Micrometer 有两种常见的指标类型:
- Counter:只增不减的计数器,适合统计"总请求数”、“总错误数"等累计值
- Gauge:可增可减的实时值,适合统计"当前积压量”、“当前在线数"等瞬时值
项目中 forwarder.records.pending 是一个会上下波动的值(消息消费后减少、转发失败后又增加),所以必须用 Gauge。而 forwarder.records.success 虽然只增不减,但用 Gauge 也没问题——Gauge 只是在每次抓取时执行一次 SELECT COUNT(*),返回当前值。
countByStatus 的实现
Repository 层只需要一个简单的 Spring Data JPA 方法,自动生成 SELECT COUNT(*) FROM forward_records WHERE status = ?:
@Repository
public interface ForwardRecordRepository extends JpaRepository<ForwardRecord, Long> {
long countByStatus(ForwardStatus status);
}性能考量:每次 Prometheus 抓取(默认 15 秒一次)会触发 5 次 SELECT COUNT(*) 查询。在数据量较大时(百万级以上),可以考虑:
- 用 PostgreSQL 的近似计数
SELECT reltuples FROM pg_class WHERE relname = 'forward_records'代替精确 COUNT - 或在应用层维护一个 AtomicLong 计数器,在状态变更时增减,避免每次查库
6.2 Prometheus 抓取配置
Spring Boot Actuator 已经暴露了 Prometheus 端点,application.yml 中的配置:
management:
endpoints:
web:
exposure:
include: health, info, prometheus, metrics
endpoint:
health:
show-details: alwaysPrometheus 在 prometheus.yml 中添加抓取任务:
scrape_configs:
- job_name: 'kafka-forwarder'
metrics_path: '/actuator/prometheus'
scrape_interval: 15s
static_configs:
- targets: ['192.168.18.175:8080']抓取到的指标原始数据长这样(/actuator/prometheus 端点输出):
# HELP forwarder_records_pending 待转发记录数(含等待重试)
# TYPE forwarder_records_pending gauge
forwarder_records_pending 12.0
# HELP forwarder_records_failed 已耗尽重试次数的失败记录数
# TYPE forwarder_records_failed gauge
forwarder_records_failed 2.0
# HELP forwarder_semaphore_available_permits HTTP 并发 Semaphore 剩余许可数
# TYPE forwarder_semaphore_available_permits gauge
forwarder_semaphore_available_permits 87.06.3 Grafana 面板关键 PromQL
以下是在 Grafana 中配置的核心看板和告警规则:
看板 1:消息积压趋势
# 最近 1 小时的待转发记录数变化
forwarder_records_pending
# 积压速率(每秒新增积压量,正数说明消费跟不上生产)
rate(forwarder_records_pending[5m])如果 rate 持续为正数,说明系统消费能力不足,需要考虑扩容实例数或调大 Semaphore 许可数。
看板 2:系统吞吐与健康度
# 成功记录的增长速率(每秒成功转发多少条)
rate(forwarder_records_success[5m])
# 当前 in-flight 数(应该 ≤ Semaphore 许可数)
forwarder_records_in_progress
# Semaphore 利用率(已用许可 / 总许可)
(100 - forwarder_semaphore_available_permits) / 100 * 100Semaphore 利用率长期接近 100% 说明 HTTP 转发是瓶颈,可以考虑调大 max-concurrency 或优化下游接口性能。
看板 3:失败记录告警
# 失败记录绝对值
forwarder_records_failed
# 失败记录增长速率
rate(forwarder_records_failed[10m])6.4 指标汇总表
| 指标 | 类型 | 数据源 | 告警建议 |
|---|---|---|---|
forwarder.records.pending | Gauge | SELECT COUNT(*) WHERE status='PENDING' | 持续增长 → 消费能力不足 |
forwarder.records.in_progress | Gauge | SELECT COUNT(*) WHERE status='IN_PROGRESS' | 应 ≤ Semaphore 许可数 |
forwarder.records.success | Gauge | SELECT COUNT(*) WHERE status='SUCCESS' | 关注 rate() 趋势 |
forwarder.records.failed | Gauge | SELECT COUNT(*) WHERE status='FAILED' | 大于 0 立即告警 |
forwarder.semaphore.available_permits | Gauge | semaphore.availablePermits() | 为 0 说明系统满载 |
为什么 forwarder.records.failed 大于 0 就应该告警?
每一条 FAILED 记录都意味着一条消息经过多次重试仍然无法送达。可能的原因包括:第三方平台长期故障、消息格式不兼容、账号认证过期等。这些都需要人工介入处理,所以应该第一时间告警。
处理 FAILED 记录的方式通常是:修复问题后,手动将记录状态改回 PENDING,让 RetryScheduler 自动重新投递:
-- 手动恢复 FAILED 记录,触发重新投递
UPDATE forward_records
SET status = 'PENDING',
next_retry_at = NULL,
attempt_count = 0
WHERE status = 'FAILED'
AND id = <目标记录ID>;七、生产部署
- Schema 迁移:将
ddl-auto改为validate,使用 Flyway/Liquibase 管理表结构变更,避免 Hibernate 自动建表带来的不可控风险 - 定期清理:SUCCESS 记录按策略归档删除(建议保留 7~30 天),避免表膨胀影响查询性能
- FAILED 告警:监控
forwarder.records.failed指标,超过阈值触发告警,人工介入处理 - 多实例部署:
SKIP LOCKED已天然支持多实例并行调度,无需额外配置,直接水平扩展即可 - 死信队列:开启
forwarder.dead-letter.enabled: true,FAILED 记录同步写入 Kafka DLQ,方便集中消费和处理
八、总结
这个项目虽然体量不大,但涉及的工程问题很有代表性。
| 问题 | 解决方案 | 核心思想 |
|---|---|---|
| 消息不能丢 | 先写 DB 再转发 + 持久化重试调度 | 持久化优先,处理在后 |
| Kafka 重复消费 | messageKey 唯一索引幂等去重 | 业务 ID 做幂等,而非 offset |
| 高并发 HTTP | 虚拟线程 + Semaphore 背压 | 低成本并发 + 流量控制 |
| 分布式重试安全 | SELECT ... FOR UPDATE SKIP LOCKED | 数据库级别的分布式锁 |
| 主键冲突 Bug | REQUIRES_NEW 独立事务拆分 | 事务边界与异步边界不能混用 |
| 多接口路由 | ApiType 枚举 + MessageMapper 策略映射 | 类型驱动路由 |
| 重试风暴 | 指数退避 + 上限控制 | 逐步增加重试间隔 |
最重要的一条:在消息不丢这个硬约束下,数据库不是瓶颈而是保障。先持久化再处理的模式虽然增加了 DB 写入开销,但换来的是确定性的数据安全。性能问题可以通过连接池调优、批量查询优化等手段逐步改善,但数据丢失是无法弥补的。
附录:系统完整流程图
A. 消息处理主流程
┌─────────────────────────────────────────────────────────────────────────────┐
│ Kafka Broker │
│ topic: device-vehicle-event / device-heartbeat │
└──────────────────────────────────┬──────────────────────────────────────────┘
│ batch poll (max 500 records)
│ concurrency = 3 (分区线程)
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ DeviceDataKafkaListener │
│ │
│ for each record in batch: │
│ 1. extractMessageId() ← 从 JSON 提取 messageId │
│ 2. 构建 ForwardRecord ← messageKey = messageId │
│ 3. dispatcher.dispatchAsync() ← 幂等入库 + 异步投递 │
│ │
│ ack.acknowledge() ← 全部落库后才提交 offset │
└──────────────────────────────────┬──────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ ForwardDispatcher │
│ │
│ ┌─────────────────────┐ ┌─────────────────────────────────────────────┐ │
│ │ 幂等检查 │ │ messageKey 不存在(首次消费) │ │
│ │ SELECT by messageKey│ │ │ │
│ │ │ │ 1. snowflakeIdGenerator.nextId() │ │
│ │ 已存在: │ │ 2. saveNewRecord() ← REQUIRES_NEW 独立事务│ │
│ │ ├─ SUCCESS → 跳过 │ │ 3. submitToVirtualThread() │ │
│ │ ├─ FAILED → 跳过 │ │ │ │
│ │ └─ PENDING → 重新 │ └──────────────────────┬──────────────────────┘ │
│ │ 投递 │ │ │
│ └─────────────────────┘ │ │
└────────────────────────────────────────────────────┼──────────────────────┘
│
▼
┌──────────────────────────────────────┐
│ Semaphore tryAcquire (背压阀门) │
│ max-concurrency = 100 │
│ timeout = 5000ms │
└──────────┬───────────────┬───────────┘
│ │
获取成功 获取失败(超时)
│ │
▼ ▼
┌──────────────┐ 记录保持 PENDING
│ 提交到虚拟 │ 等 RetryScheduler
│ 线程执行器 │ 下次扫描处理
│ │
│ Executors │
│ .newVirtual │
│ ThreadPer │
│ TaskExecutor │
└──────┬───────┘
│
▼
┌─────────────────────────────────────────────────────────────────────────────┐
│ HttpForwardService │
│ (在虚拟线程中执行) │
│ │
│ 1. UPDATE status = IN_PROGRESS, attempt_count + 1 │
│ │
│ 2. PlatformRequestBuilder.build(record) │
│ ├─ 反序列化 Kafka JSON (KafkaVehicleEventMessage / KafkaHeartbeatMsg) │
│ ├─ MessageMapper 字段映射转换 │
│ │ ├─ inoutType: 0→"2"(驶出), 1→"1"(驶入) │
│ │ ├─ plateCredible: 0.854 → "85" │
│ │ ├─ picType: 1→"1", 2→"2", 3→"0", 4→"2" │
│ │ └─ picUrl: jieshun/xxx.jpg → /jieshun/xxx.jpg │
│ ├─ 注入 access_name / access_pwd │
│ └─ 构建 HttpRequest (POST JSON) │
│ │
│ 3. JDK HttpClient.send() ──── HTTP/2 ────► 第三方平台 API │
│ ├─ /mch/business (车辆出入事件) │
│ └─ /mch/heartbeat (心跳抓拍图) │
│ │
│ 4. 根据响应判断结果: │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ HTTP 非 2xx / 超时 / 异常 │ │
│ │ → scheduleRetry(): status=PENDING, 计算指数退避 │ │
│ │ → 超过 maxAttempts → FAILED (终态) │ │
│ ├──────────────────────────────────────────────────────────┤ │
│ │ HTTP 2xx + result=0 │ │
│ │ → SUCCESS (终态) │ │
│ ├──────────────────────────────────────────────────────────┤ │
│ │ HTTP 2xx + errcode=4 (重复上报) │ │
│ │ → SUCCESS (视为成功,平台已幂等处理) │ │
│ ├──────────────────────────────────────────────────────────┤ │
│ │ HTTP 2xx + errcode=998 (无需重发) │ │
│ │ → FAILED (终态,不重试) │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │
│ 5. 写入 forward_retry_logs (每次尝试的详细日志) │
│ │
│ 6. Semaphore.release() ← 释放许可 │
└─────────────────────────────────────────────────────────────────────────────┘B. RetryScheduler 重试流程
┌─────────────────────────────────────────────────────────────────────────────┐
│ RetryScheduler (@Scheduled, 每 5s) │
│ │
│ SELECT * FROM forward_records │
│ WHERE status = 'PENDING' │
│ AND (next_retry_at IS NULL OR next_retry_at <= NOW()) │
│ ORDER BY next_retry_at ASC NULLS FIRST, id ASC │
│ LIMIT 100 │
│ FOR UPDATE SKIP LOCKED ◄─── 行级排他锁 + 跳过已锁定行 │
│ │
│ 多实例场景: │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 实例 A │ │ 实例 B │ │ 实例 C │ │
│ │ 锁定 │ │ SKIP │ │ SKIP │ │
│ │ 记录 1~ │ │ 记录 1~ │ │ 记录 1~ │ │
│ │ 100 │ │ 100, │ │ 100, │ │
│ │ │ │ 锁定 │ │ 101~ │ │
│ │ │ │ 记录 │ │ 200, │ │
│ │ │ │ 101~200 │ │ 锁定 │ │
│ │ │ │ │ │ 记录 │ │
│ │ │ │ │ │ 201~300 │ │
│ └──────────┘ └──────────┘ └──────────┘ │
│ │ │ │ │
│ └──────────────────┴──────────────────┘ │
│ │ │
│ ▼ │
│ 每条记录 → dispatcher.dispatchRetry() │
│ │ │
│ ▼ │
│ submitToVirtualThread() │
│ (与主流程相同,经过 Semaphore → 虚拟线程 → HTTP 转发) │
└─────────────────────────────────────────────────────────────────────────────┘C. 状态机流转图
┌────────────────────────────────────────────┐
│ │
▼ │
┌─────────┐ HTTP开始转发 ┌──────────────┐ HTTP成功 ┌─────────┐
│ │ ───────────────► │ │ ─────────────► │ │
│ PENDING │ │ IN_PROGRESS │ │ SUCCESS │
│ │ ◄─────────────── │ │ │ (终态) │
└────┬────┘ 转发失败 └──────┬───────┘ └─────────┘
│ (attempt < max) │
│ │ 转发失败
│ │ (attempt >= max)
│ ▼
│ ┌─────────┐
│ │ │
└──────────────────────► │ FAILED │
RetryScheduler │ (终态) │
重新投递 └─────────┘
生命周期示例 (maxAttempts=5, initial=5s, multiplier=2):
T=0s Kafka 消息到达 → INSERT (status=PENDING, attempt=0)
T=0s 首次投递 → UPDATE (status=IN_PROGRESS, attempt=1) → HTTP 请求
T=0s HTTP 失败 → UPDATE (status=PENDING, next_retry_at=T+5s)
T=5s RetryScheduler 扫描 → 投递 → (status=IN_PROGRESS, attempt=2) → HTTP
T=5s HTTP 失败 → (status=PENDING, next_retry_at=T+15s) // 5×2=10s
T=15s RetryScheduler 扫描 → 投递 → (status=IN_PROGRESS, attempt=3) → HTTP
T=15s HTTP 失败 → (status=PENDING, next_retry_at=T+35s) // 10×2=20s
T=35s RetryScheduler 扫描 → 投递 → (status=IN_PROGRESS, attempt=4) → HTTP
T=35s HTTP 失败 → (status=PENDING, next_retry_at=T+75s) // 20×2=40s
T=75s RetryScheduler 扫描 → 投递 → (status=IN_PROGRESS, attempt=5) → HTTP
T=75s HTTP 失败 → (status=FAILED) // attempt=5 >= maxAttempts=5, 终态D. 崩溃恢复场景
场景 1:落库后、提交 offset 前 JVM 崩溃
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
Kafka 消息到达
│
▼
INSERT 到 DB (status=PENDING) ← 已持久化
│
▼
💥 JVM 崩溃!
│
▼
Kafka offset 未提交 → 重启后 Kafka 重新投递
│
▼
消费端幂等检查: messageKey 已存在 + status=PENDING
│
▼
不重复 INSERT,直接重新投递到虚拟线程 ✅ 消息不丢
场景 2:提交 offset 后、HTTP 转发前 JVM 崩溃
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
INSERT 到 DB + 提交 offset
│
▼
💥 JVM 崩溃!
│
▼
DB 中存在 PENDING 记录 (next_retry_at=NULL)
│
▼
RetryScheduler 5秒后扫到 → 重新投递 ✅ 消息不丢
场景 3:HTTP 转发过程中 JVM 崩溃
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
UPDATE (status=IN_PROGRESS) + HTTP 请求发出
│
▼
💥 JVM 崩溃!
│
▼
情况 A: HTTP 请求未到达第三方
→ RetryScheduler 重新投递 ✅ 消息不丢
情况 B: HTTP 请求已到达,第三方已处理
→ RetryScheduler 重新投递
→ 第三方返回 errcode=4 (重复上报)
→ 标记为 SUCCESS ✅ 消息不丢,且不重复
场景 4:HTTP 转发成功后、DB UPDATE 前 JVM 崩溃
━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━
HTTP 请求成功 (第三方已处理)
│
▼
💥 JVM 崩溃!(status 仍然是 IN_PROGRESS)
│
▼
RetryScheduler 重新投递
→ 第三方返回 errcode=4 (重复上报)
→ 标记为 SUCCESS ✅ 消息不丢,且不重复E. 数据库表关系
┌──────────────────────────────────────┐ ┌──────────────────────────────────────┐
│ forward_records (主表) │ │ forward_retry_logs (日志表) │
├──────────────────────────────────────┤ ├──────────────────────────────────────┤
│ id BIGINT (PK, 雪花ID) │ │ id BIGINT (PK, 雪花ID) │
│ message_key VARCHAR(64) (UNIQUE) │◄──────│ record_id BIGINT (FK → 主表) │
│ api_type VARCHAR(30) │ │ attempt_number INT │
│ kafka_topic VARCHAR(255) │ │ result_status VARCHAR(20) │
│ kafka_partition INT │ │ http_status INT │
│ kafka_offset BIGINT │ │ platform_errcode VARCHAR(20) │
│ kafka_key VARCHAR(512) │ │ error_message VARCHAR(1000) │
│ payload TEXT │ │ duration_ms BIGINT │
│ status VARCHAR(20) │ │ attempted_at TIMESTAMP │
│ attempt_count INT │ │ next_retry_at TIMESTAMP │
│ next_retry_at TIMESTAMP │ └──────────────────────────────────────┘
│ last_http_status INT │
│ last_error VARCHAR(1000) │ 关系:forward_records (1) ←→ (*) forward_retry_logs
│ created_at TIMESTAMP │
│ updated_at TIMESTAMP │ 主表:当前状态快照(高频读写)
│ succeeded_at TIMESTAMP │ 日志表:每次尝试详情(追加写入,低频查询)
└──────────────────────────────────────┘
索引:
idx_fr_message_key → message_key (UNIQUE) 幂等去重
idx_fr_status_next_retry → status, next_retry_at RetryScheduler 扫描
idx_fr_created_at → created_at 定期清理
idx_frl_record_id → record_id 查询重试历史