异步回调队列:从内存到分布式的演进之路

跨进程任务协调的技术原理与实践

December 26, 2024·16 min read·Yimin
#异步#回调#分布式#Redis PubSub#消息队列

当等待方和通知方不在同一个进程时,如何优雅地协调?本文带你理解异步回调队列的核心原理。

🎯 为什么需要异步回调队列?

真实场景:等待外部系统响应

假设你有一个服务,需要:

  1. 接收用户请求
  2. 调用外部系统执行操作(可能需要几分钟)
  3. 等待外部系统回调通知完成
  4. 将结果返回给用户
时间轴 →

用户请求:     [发起请求] ──────────────────────────────────── [收到结果]
                 ↓                                              ↑
服务端:       [接收] → [调用外部系统] → [等待回调...] → [处理结果] → [返回]
                              ↓                ↑
外部系统:                  [执行中...] ────→ [回调通知]

问题:如何让"发起请求"的代码等待"回调通知"?

这两个动作可能发生在:

  • 不同的协程
  • 不同的线程
  • 不同的进程
  • 甚至不同的服务器

核心挑战

┌─────────────────────────────────────────────────────────────────┐
│                     典型的长时任务场景                           │
│                                                                 │
│  1. 用户请求 → 服务端开始处理                                    │
│  2. 服务端调用外部系统 → 外部系统开始执行(耗时几分钟)           │
│  3. 服务端需要"等待"外部系统完成                                 │
│  4. 外部系统完成后"通知"服务端                                   │
│  5. 服务端将结果返回给用户                                       │
│                                                                 │
│  关键问题:                                                      │
│  ┌───────────────────────────────────────────────────────────┐  │
│  │ "等待"的代码 和 "通知"的代码 如何协调?                     │  │
│  │ 它们可能在不同进程/不同机器上!                             │  │
│  └───────────────────────────────────────────────────────────┘  │
└─────────────────────────────────────────────────────────────────┘

没有协调机制,你的系统就像两个无法沟通的人。


🔍 技术演进:从单机到分布式

阶段一:内存队列(单进程方案)

为什么这样做?

最简单直接的方案:用编程语言内置的异步队列。

┌─────────────────────────────────────────────────────────────┐
│                    单个 Python 进程                          │
│                                                             │
│  全局队列字典:                                               │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  queues = {                                         │    │
│  │    "session-1": Queue(),                            │    │
│  │    "session-2": Queue(),                            │    │
│  │  }                                                  │    │
│  └─────────────────────────────────────────────────────┘    │
│                                                             │
│  协程 A (等待方):                                            │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  item = await queues["session-1"].get()             │    │
│  │  # 阻塞等待,直到有人放入数据                         │    │
│  └─────────────────────────────────────────────────────┘    │
│                          ↑                                  │
│                          │ 消息传递                          │
│                          ↓                                  │
│  协程 B (通知方):                                            │
│  ┌─────────────────────────────────────────────────────┐    │
│  │  await queues["session-1"].put(result)              │    │
│  │  # 放入数据,唤醒等待的协程                           │    │
│  └─────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────┘

工作原理

  1. 创建队列:每个会话(session)对应一个独立的内存队列
  2. 等待方:调用 queue.get() 阻塞等待
  3. 通知方:调用 queue.put() 放入数据,唤醒等待方
  4. 清理:任务完成后从字典中删除队列

效果

  • ✅ 实现简单,几十行代码搞定
  • ✅ 性能极高(纯内存操作,微秒级)
  • ✅ 无外部依赖

致命问题

┌─────────────────────────────────────────┐
│          进程 A (处理用户请求)           │
│                                         │
│  queues = { "session-1": Queue() }     │
│                                         │
│  协程等待: await queue.get()            │
│           → 永远等不到!                 │
└─────────────────────────────────────────┘

┌─────────────────────────────────────────┐
│          进程 B (处理回调请求)           │
│                                         │
│  queues = {}  ← 完全独立!              │
│                                         │
│  queue.put(result)                      │
│           → 找不到队列!                 │
└─────────────────────────────────────────┘

内存是进程私有的!

  • 进程 A 的字典和进程 B 的字典是两个完全独立的对象
  • 无法跨进程通信
  • 分布式部署时完全失效

阶段二:Redis PubSub(分布式方案)

为什么选择 Redis PubSub?

需要一个跨进程/跨机器的消息传递机制,Redis 的发布/订阅(Pub/Sub)模式正好满足需求。

┌─────────────────────────────────────────────────────────────────┐
│                      Redis PubSub 原理                          │
│                                                                 │
│                     ┌───────────────────┐                       │
│                     │   Redis 服务器    │                       │
│                     │                   │                       │
│                     │  频道: channel-1  │                       │
│                     │  订阅者: [A]      │                       │
│                     │                   │                       │
│                     │  频道: channel-2  │                       │
│                     │  订阅者: [B, C]   │                       │
│                     └─────────┬─────────┘                       │
│                               │                                 │
│              ┌────────────────┼────────────────┐                │
│              │                │                │                │
│              ▼                ▼                ▼                │
│         ┌────────┐       ┌────────┐       ┌────────┐            │
│         │ 进程 A │       │ 进程 B │       │ 进程 C │            │
│         │ (订阅) │       │ (订阅) │       │ (发布) │            │
│         └────────┘       └────────┘       └────────┘            │
│                                                                 │
│  发布/订阅模式:                                                  │
│  - 订阅者订阅某个频道,等待消息                                   │
│  - 发布者向频道发布消息                                          │
│  - Redis 将消息广播给该频道的所有订阅者                           │
└─────────────────────────────────────────────────────────────────┘

工作原理

1. 订阅(Subscribe)

等待方订阅一个唯一的频道(如 callback:session-123):

进程 A:
  → 订阅频道 "callback:session-123"
  → 阻塞等待消息...

2. 发布(Publish)

通知方向频道发布消息:

进程 B:
  → 向频道 "callback:session-123" 发布消息
  → Redis 将消息推送给所有订阅者

3. 接收

等待方收到消息,继续处理:

进程 A:
  → 收到消息
  → 继续后续逻辑

为什么能跨进程/跨机器?

┌─────────────────────────────────────────────────────────────────┐
│                        关键洞察                                  │
│                                                                 │
│  内存队列:  数据存在进程内存中                                    │
│            → 进程私有,无法共享                                   │
│                                                                 │
│  Redis:    数据存在 Redis 服务器中                               │
│            → 所有进程连接同一个 Redis                             │
│            → 通过网络通信,实现跨进程共享                          │
│                                                                 │
│  ┌─────────┐     ┌─────────┐     ┌─────────┐                    │
│  │ 进程 A  │     │ 进程 B  │     │ 进程 C  │                    │
│  └────┬────┘     └────┬────┘     └────┬────┘                    │
│       │              │              │                           │
│       │   TCP 连接   │              │                           │
│       └──────────────┼──────────────┘                           │
│                      │                                          │
│                      ▼                                          │
│              ┌───────────────┐                                  │
│              │ Redis 服务器  │                                  │
│              │ (共享状态)    │                                  │
│              └───────────────┘                                  │
└─────────────────────────────────────────────────────────────────┘

效果

  • ✅ 跨进程/跨机器通信
  • ✅ 支持分布式部署
  • ✅ 数据不会因进程重启丢失
  • ⚠️ 增加了网络延迟(毫秒级)
  • ⚠️ 需要维护 Redis 服务

阶段三:连接管理优化

遇到的问题

基础的 Redis PubSub 实现有几个性能问题:

┌─────────────────────────────────────────────────────────────────┐
│                       问题 1: 连接膨胀                           │
│                                                                 │
│  每个订阅操作可能创建新的 PubSub 连接:                            │
│                                                                 │
│  协程 1: subscribe("channel-1") → 连接 1                        │
│  协程 2: subscribe("channel-2") → 连接 2                        │
│  协程 3: subscribe("channel-3") → 连接 3                        │
│  ...                                                            │
│  协程 N: subscribe("channel-N") → 连接 N                        │
│                                                                 │
│  问题: 高并发时连接数爆炸!                                       │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│                       问题 2: 消息接收效率低                      │
│                                                                 │
│  每个协程独立轮询消息:                                            │
│                                                                 │
│  协程 1: while True: message = pubsub.get_message()             │
│  协程 2: while True: message = pubsub.get_message()             │
│  协程 3: while True: message = pubsub.get_message()             │
│                                                                 │
│  问题: CPU 资源浪费!                                            │
└─────────────────────────────────────────────────────────────────┘

┌─────────────────────────────────────────────────────────────────┐
│                       问题 3: 消息语义不清                        │
│                                                                 │
│  所有情况都返回 None:                                            │
│  - 超时 → None                                                  │
│  - 错误 → None                                                  │
│  - 取消订阅 → None                                              │
│                                                                 │
│  问题: 调用方无法区分不同情况!                                   │
└─────────────────────────────────────────────────────────────────┘

解决方案:单连接 + 后台分发

┌─────────────────────────────────────────────────────────────────┐
│                     优化后的架构                                 │
│                                                                 │
│  ┌─────────────────────────────────────────────────────────┐    │
│  │                   PubSub 管理器                          │    │
│  │                                                         │    │
│  │  单例 Redis PubSub 连接 ───────────────────┐            │    │
│  │                                            │            │    │
│  │  后台接收任务 ─────┐                        │            │    │
│  │       │           │                        │            │    │
│  │       ▼           │                        │            │    │
│  │  ┌──────────────────────────────────────┐  │            │    │
│  │  │ while True:                          │  │            │    │
│  │  │   message = await pubsub.get()       │←─┘            │    │
│  │  │   channel = message.channel          │               │    │
│  │  │   queue = channel_queues[channel]    │               │    │
│  │  │   queue.put(message)                 │               │    │
│  │  └──────────────────────────────────────┘               │    │
│  │                    │                                    │    │
│  │                    │ 分发                                │    │
│  │                    ▼                                    │    │
│  │  ┌─────────────────────────────────────────────────┐    │    │
│  │  │  channel_queues = {                             │    │    │
│  │  │    "channel-1": Queue() ← 协程 1 在等待         │    │    │
│  │  │    "channel-2": Queue() ← 协程 2 在等待         │    │    │
│  │  │    "channel-3": Queue() ← 协程 3 在等待         │    │    │
│  │  │  }                                              │    │    │
│  │  └─────────────────────────────────────────────────┘    │    │
│  └─────────────────────────────────────────────────────────┘    │
└─────────────────────────────────────────────────────────────────┘

核心思想

1. 单连接复用

所有订阅共享一个 Redis PubSub 连接:

  • 减少连接数
  • 降低 Redis 服务器压力
  • 简化连接管理

2. 后台统一接收

单个后台任务负责:

  • 持续从 Redis 拉取消息
  • 根据频道名分发到对应的本地队列
  • 唤醒等待该频道的协程

3. 本地队列缓冲

每个频道对应一个本地 asyncio.Queue:

  • 等待方从本地队列获取消息(高效)
  • 后台任务向本地队列投递消息
  • 解耦网络 IO 和业务逻辑

4. 结构化消息

返回结构化的消息对象,明确区分:

  • message: 正常收到消息
  • timeout: 等待超时
  • error: 发生错误
  • unsubscribe: 订阅被取消

效果

  • ✅ 连接数固定为 1(无论多少订阅)
  • ✅ CPU 利用率提升(单任务接收,无重复轮询)
  • ✅ 消息语义清晰(可区分不同状态)
  • ✅ 高并发友好

🏛️ 技术要点详解

Redis PubSub 的特性

1. 消息是广播的

┌─────────────────────────────────────────────────────────────────┐
│                        广播特性                                  │
│                                                                 │
│  发布者发送一条消息 → 所有订阅者都会收到                          │
│                                                                 │
│  场景:                                                          │
│    频道 "updates" 有 3 个订阅者                                  │
│    发布者发送 "hello"                                            │
│    → 订阅者 A 收到 "hello"                                       │
│    → 订阅者 B 收到 "hello"                                       │
│    → 订阅者 C 收到 "hello"                                       │
│                                                                 │
│  注意: 在异步回调场景中,通常每个频道只有一个订阅者                 │
│        (因为频道名包含唯一的 session_id)                        │
└─────────────────────────────────────────────────────────────────┘

2. 消息不持久化

┌─────────────────────────────────────────────────────────────────┐
│                      不持久化特性                                │
│                                                                 │
│  发布时如果没有订阅者 → 消息丢失                                  │
│                                                                 │
│  时序:                                                          │
│    T1: 发布者发送消息 → 无订阅者 → 消息丢弃!                     │
│    T2: 订阅者订阅频道 → 收不到 T1 的消息                          │
│                                                                 │
│  应对策略:                                                       │
│    - 确保先订阅,再触发发布                                       │
│    - 或使用 Redis Streams(支持持久化)                          │
└─────────────────────────────────────────────────────────────────┘

3. 订阅是阻塞的

┌─────────────────────────────────────────────────────────────────┐
│                      阻塞特性                                    │
│                                                                 │
│  订阅后,连接进入"订阅模式":                                     │
│    - 只能执行 SUBSCRIBE/UNSUBSCRIBE/PSUBSCRIBE 等命令            │
│    - 不能执行 GET/SET 等普通命令                                  │
│                                                                 │
│  这就是为什么需要单独的 PubSub 连接,而不是复用普通连接            │
└─────────────────────────────────────────────────────────────────┘

判断订阅者是否存在

PUBSUB NUMSUB 命令

┌─────────────────────────────────────────────────────────────────┐
│                   PUBSUB NUMSUB 命令                             │
│                                                                 │
│  作用: 返回指定频道的订阅者数量                                   │
│                                                                 │
│  用途: 发布前检查是否有人在等待                                   │
│        - 有订阅者 → 发布消息                                     │
│        - 无订阅者 → 返回失败/跳过                                 │
│                                                                 │
│  示例:                                                          │
│    PUBSUB NUMSUB "callback:session-123"                         │
│    → 返回: ["callback:session-123", 1]  (有 1 个订阅者)          │
│    → 或者: ["callback:session-123", 0]  (无订阅者)               │
└─────────────────────────────────────────────────────────────────┘

应用场景

┌─────────────────────────────────────────────────────────────────┐
│                   防止重复处理                                   │
│                                                                 │
│  场景: 同一个任务可能被多次触发                                   │
│                                                                 │
│  流程:                                                          │
│    1. 新任务到来                                                 │
│    2. 检查 PUBSUB NUMSUB("task:xxx")                            │
│       - > 0 → 已有人在处理,返回"任务进行中"                      │
│       - = 0 → 无人处理,开始处理并订阅频道                        │
│    3. 处理完成后取消订阅                                         │
│                                                                 │
│  效果: 确保同一任务只有一个处理者                                 │
└─────────────────────────────────────────────────────────────────┘

取消订阅时唤醒等待者

问题

┌─────────────────────────────────────────────────────────────────┐
│                      问题场景                                    │
│                                                                 │
│  协程 A:                                                        │
│    item = await queue.get()  # 阻塞等待中...                     │
│                                                                 │
│  此时任务被取消,需要取消订阅:                                    │
│    - 如果直接取消订阅,协程 A 会永远阻塞!                        │
│    - 因为没有人往队列里放东西                                    │
└─────────────────────────────────────────────────────────────────┘

解决方案

┌─────────────────────────────────────────────────────────────────┐
│                      唤醒机制                                    │
│                                                                 │
│  取消订阅时:                                                     │
│    1. 向本地队列投递一个特殊值(如 None)                         │
│    2. 等待中的协程收到 None,识别为"取消订阅"信号                 │
│    3. 协程退出等待,进行清理                                     │
│                                                                 │
│  流程:                                                          │
│    unsubscribe("channel-1"):                                    │
│      queue = channel_queues["channel-1"]                        │
│      queue.put(None)  # 唤醒等待者                               │
│      del channel_queues["channel-1"]                            │
│      redis.unsubscribe("channel-1")                             │
└─────────────────────────────────────────────────────────────────┘

🎯 架构对比

三种方案的对比

┌─────────────────────────────────────────────────────────────────┐
│                        方案对比                                  │
│                                                                 │
│  ┌────────────────────────────────────────────────────────────┐ │
│  │ 方案一: 内存队列                                            │ │
│  │                                                            │ │
│  │   进程 ──→ 内存队列 ──→ 进程                                │ │
│  │                                                            │ │
│  │   ✅ 简单、快速                                            │ │
│  │   ❌ 仅单进程                                              │ │
│  └────────────────────────────────────────────────────────────┘ │
│                                                                 │
│  ┌────────────────────────────────────────────────────────────┐ │
│  │ 方案二: Redis PubSub (基础版)                               │ │
│  │                                                            │ │
│  │   进程 A ──→ Redis ──→ 进程 B                              │ │
│  │   (多连接)                                                 │ │
│  │                                                            │ │
│  │   ✅ 跨进程/跨机器                                         │ │
│  │   ⚠️ 连接数多、效率一般                                    │ │
│  └────────────────────────────────────────────────────────────┘ │
│                                                                 │
│  ┌────────────────────────────────────────────────────────────┐ │
│  │ 方案三: Redis PubSub (优化版)                               │ │
│  │                                                            │ │
│  │   进程 ──→ 单连接 ──→ Redis ──→ 后台任务 ──→ 本地队列       │ │
│  │                                                            │ │
│  │   ✅ 跨进程/跨机器                                         │ │
│  │   ✅ 单连接、高效率                                        │ │
│  │   ✅ 语义清晰                                              │ │
│  └────────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘

性能对比

指标内存队列Redis PubSub (基础)Redis PubSub (优化)
延迟微秒级毫秒级毫秒级
连接数0N (每订阅一个)1 (固定)
跨进程
跨机器
CPU 效率⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
内存效率⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐

🚨 常见问题与解决方案

1. 消息丢失

问题:发布时没有订阅者

时序:
  T1: 发布者发送消息 → 无订阅者 → 消息丢失!
  T2: 订阅者订阅 → 收不到 T1 的消息

解决方案:确保订阅在前

正确流程:
  T1: 订阅者先订阅频道
  T2: 触发外部操作(外部系统会发送回调)
  T3: 外部系统回调 → 发布消息 → 订阅者收到

2. 订阅泄漏

问题:异常退出时没有取消订阅

场景:
  协程订阅频道 → 处理过程中抛异常 → 没有执行取消订阅
  → Redis 仍认为有订阅者
  → 资源泄漏

解决方案:使用上下文管理器

正确做法:
  with subscribe("channel") as queue:
      # 处理逻辑
      pass
  # 无论正常还是异常退出,都会自动取消订阅

3. 超时处理

问题:等待超时后如何处理?

场景:
  订阅者等待 30 秒后超时
  → 需要区分"超时"和"收到消息"
  → 基础实现都返回 None,无法区分

解决方案:结构化返回值

返回结构化消息:
  - type: "message" → 正常收到消息
  - type: "timeout" → 等待超时
  - type: "error" → 发生错误
  - type: "unsubscribe" → 订阅被取消

调用方可根据 type 做不同处理

4. 并发安全

问题:多个协程同时操作同一个频道

场景:
  协程 A: 检查频道不存在 → 创建队列
  协程 B: 检查频道不存在 → 创建队列 (覆盖 A 的队列!)
  协程 A: 使用队列 → 用的是 B 创建的队列
  → 数据混乱

解决方案:加锁保护

正确做法:
  with lock:
      if channel not in queues:
          queues[channel] = Queue()
          redis.subscribe(channel)
  
  # 确保同一频道的初始化是原子的

🛠️ 最佳实践

1. 频道命名规范

推荐格式: {业务类型}:{唯一标识}

示例:
  - callback:session-12345
  - notification:user-6789
  - task:job-abcdef

好处:
  - 便于区分不同业务
  - 便于监控和排查
  - 避免命名冲突

2. 合理设置超时

原则: 超时时间 = 预期耗时 × 2 ~ 3

示例:
  - 外部操作预计 30 秒 → 超时设置 60-90 秒
  - 太短: 可能误判为失败
  - 太长: 用户等待过久

3. 优雅的清理机制

任务完成后:
  1. 取消订阅(释放 Redis 资源)
  2. 清理本地队列(释放内存)
  3. 记录日志(便于排查)

使用上下文管理器自动处理清理逻辑

4. 日志记录

关键日志点:
  - 订阅成功: [PUBSUB] Subscribed to channel-xxx
  - 收到消息: [PUBSUB] Received message on channel-xxx
  - 超时: [PUBSUB] Timeout waiting for channel-xxx
  - 取消订阅: [PUBSUB] Unsubscribed from channel-xxx

便于:
  - 排查问题
  - 监控系统状态
  - 性能分析

🚀 总结

核心价值

没有异步回调机制有异步回调机制
🙈 无法等待外部系统响应✅ 优雅地等待回调
😰 只能轮询查询状态✅ 被动接收通知
🔮 分布式场景失效✅ 跨进程/跨机器协调
💸 资源浪费💰 高效的消息传递

技术演进路线

内存队列 (单进程)
    │
    │  问题: 无法跨进程
    │
    ▼
Redis PubSub (分布式)
    │
    │  问题: 连接膨胀、效率低
    │
    ▼
Redis PubSub + 单连接优化 (高效分布式)
    │
    │  ✅ 单连接
    │  ✅ 后台分发
    │  ✅ 本地缓冲
    │  ✅ 语义清晰
    │
    ▼
生产可用

记忆口诀

🏠 内存队列: 快但单机
🌐 Redis PubSub: 跨机但要优化
📬 后台分发: 单连接 + 本地队列
🔔 消息语义: message/timeout/error/unsubscribe

选型建议

场景分析:

单进程内的协程间通信
  → 内存队列 (asyncio.Queue)

跨进程/跨机器的异步回调
  → Redis PubSub

高并发 + 跨进程
  → Redis PubSub + 单连接优化

需要消息持久化
  → Redis Streams 或 消息队列 (RabbitMQ/Kafka)

📚 延伸阅读


异步回调队列是分布式系统中协调长时任务的基础设施。

选择合适的技术方案,让你的系统既能优雅地等待,又能高效地响应。 🚀

现在,你的系统是如何处理异步回调的? 🤔

异步回调队列:从内存到分布式的演进之路