Msg
以 两个消费者 A,B 订阅同一 channel 为例, 共计 10 个任务, 耗时各不同
我以为的轮询是 任一消费者执行完任务, 自动拉取下一个消息 观察到的现象并非如此
- A 消费者拉取到 5 个简单任务很快就执行结束了
- B 消费者阻塞在第一个任务, 此时 A 也拉取不到剩余的任务了
默认的轮询: 平均分配
默认情况下, RabbitMQ 不关心消费者 A 和 B 处理消息快不快, 它只负责分发. 当队列中有 10 个消息时, mq 会像发牌一样, 按顺序一次性把消息分配出去
消息1 -> 给A
消息2 -> 给B
消息3 -> 给A
消息4 -> 给B
...
消息9 -> 给A
消息10 -> 给B
这个分配过程是在消息进入队列的瞬间 (或消费者连接的瞬间) 极快完成的
消息预取 (prefetch) 与 缓存
一旦消息分发给消费者, 这些消息就离开了 mq 队列 (ready 状态), 进入了某个消费者的本地内存缓冲区 (tcp buffer / client buffer)
虽然 B 还在处理 ” 消息 2” (耗时旧), 但 ” 消息 4,6,8,10” 已经在 B 的缓冲区了, 只是消费者 B 还没来得及处理
此时的状态:
- 消费者 A: 快速处理完了 1,3,5,7,9. 手里没活了, 向 mq 要任务, 但此时 mq 中已经没有 ready 的任务了
- 消费者 B: 正在处理 2, 但是 4,6,8,10 已经被 B 霸占了
- 结果: A 只能闲着, 看着 B 忙死
给一个消费者分发的上限是多少?
上限取决于
prefetch_count的设置
- 未设置 prefetch_count (默认情况 / 0)
- 上限: 无限
- 行为: 若队列中有 1 万条消息, 消费者 A 连上来, mq 会视作 A 胃口无限大, 它会以最快的速度将这 1 万条消息全部推送到 A
- 结果: mq 服务端的队列瞬间变空 (状态全为 unacked), 所有压力转移到消费者 A
- 设置了 prefetch_count = 5
- 上限: 5 条
- 行为: mq 维护一个计数器. 它发送 5 条消息给 A 后, 就会暂停发送. 直到 A 处理完其中一条并回复了 ack, mq 发现 (5-1=4 < 5), 才会把第 6 条发过去
以 python pika 库 为例
# 1. 设置 QoS,每次只预取 1 条
channel.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
print("收到任务")
# 模拟耗时操作
do_heavy_work(body)
# 2. 关键:处理完后手动发送 ACK
ch.basic_ack(delivery_tag=method.delivery_tag)
# 3. 消费时关闭自动确认 (auto_ack=False)
channel.basic_consume(queue='task_queue', on_message_callback=callback, auto_ack=False)总结
标准做法:
- 绝不使用默认的无限领取
- 设置合理的 prefetch_count
- 追求极致公平 (防止阻塞): 设为 1, 适合任务耗时, 波动极大的场景. 缺点是 网络交互频繁, 吞吐量低
- 追求吞吐量 (批量处理): 设为 50~500, 适合任务处理快且耗时均匀的场景, 可以利用客户端内存缓存, 减少网络等待