Python asyncio.Queue 如何实现优先级队列

asyncio.Queue不支持优先级,需用asyncio.PriorityQueue;后者是其子类,基于heapq实现,要求put/get时传入(priority, item)元组,数字越小优先级越高。

asyncio.Queue 本身不支持优先级,得换用 asyncio.PriorityQueue

Python 标准库的 asyncio.Queue 是 FIFO(先进先出)队列,没有内置优先级逻辑。真要按优先级消费任务,必须用 asyncio.PriorityQueue —— 它是 asyncio.Queue 的子类,底层基于 heapq,要求你传入可比较的元素(通常是 (priority, item) 元组)。

put() 和 get() 要配合元组结构使用

asyncio.PriorityQueue 里塞东西,不能直接 put(item),得显式构造带优先级的元组。数字越小,优先级越高(和 heapq 一致)。常见错误是忘记包装或传错顺序:

  • await pq.put((5, "low-pri task"))
  • await pq.put("task") ❌ —— 会报 TypeError: '
  • await pq.put(("high", 1)) ❌ —— 字符串和整数无法比较,且顺序反了

如果业务逻辑中优先级是枚举或字符串,建议提前映射为整数,比如 {"critical": 0, "normal": 10, "debug": 99}

注意 priority 值相同时的“不稳定排序”

asyncio.PriorityQueue 在优先级相等时,不保证插入顺序(即不是稳定排序)。例如连续 put 两个 (1, "a")(1, "b"),get() 可能先返回 b。若需严格 FIFO 补偿,可以加个自增计数器:

import asyncio
import itertools

counter = itertools.count() pq = asyncio.PriorityQueue()

插入时:(priority, count, item)

await pq.put((1, next(counter), "task-a")) await pq.put((1, next(counter), "task-b"))

这样相同 priority 下靠 count 保序,但代价是内存多存一个整数、比较开销略增。

别混用 sync 和 async 的优先级队列

有人试图用 queue.PriorityQueue(同步版)配合 asyncio.to_t

hreadloop.run_in_executor 来“曲线救国”,这会引入线程切换开销,且容易因竞态导致 get() 阻塞行为不符合预期。除非你明确需要跨线程调度,否则纯 async 场景务必坚持用 asyncio.PriorityQueue

真正麻烦的是优先级动态变化——asyncio.PriorityQueue 不支持修改已入队元素的 priority,只能 cancel 任务并重新入队。这点常被忽略,实际做重调度时得自己维护 pending 任务表。