Python爬虫请求调度设计_任务队列实现思路【教程】

直接用 queue.Queue 易卡死,因其 get() 默认无限阻塞且无超时/异常穿透机制;asyncio.Queue 需配 timeout 和 task_done;Redis 用 zset + bzpopmin 支持优先级与持久化;须通过 full() 或 zcard 实现反压控制。

为什么直接用 queue.Queue 在爬虫里容易卡死

多线程爬虫中,如果直接用标准库的 queue.Queue 做任务分发,常出现消费者线程全部阻塞在 get()、生产者却因异常退出而不再放新任务——队列既没满也没空,但整个调度就僵住了。根本原因是它默认的阻塞行为缺乏超时兜底和异常穿透机制。

  • get(block=True) 会无限等待,一旦上游断流,线程就挂起不响应中断
  • 没有内置重试计数或失败归档逻辑,单个坏 URL 可能导致任务永久滞留
  • 无法跨进程共享,后续想加分布式调度就得重写整套队列层

asyncio.Queue 实现轻量异步调度的关键配置

对中小规模 HTTP 爬取(比如每秒 10–50 请求),asyncio.Queue 比线程队列更省资源,但必须显式控制生命周期,否则协程会泄漏。

import asyncio

async def worker(queue: asyncio.Queue, session): while True: try: url = await asyncio.wait_for(queue.get(), timeout=3.0) # 必须设超时 async with session.get(url) as resp:

处理响应...

        queue.task_done()  # 必须调用,否则 join() 不返回
    except asyncio.TimeoutError:
        break  # 超时即退出,避免死循环
    except Exception as e:
        print(f"Worker error on {url}: {e}")
        queue.task_done()  # 错误也要标记完成,否则队列卡住
  • asyncio.wait_for(..., timeout=...) 是刚需,不能依赖 get_nowait() —— 它抛 queue.Empty 异常,但协程里没地方 catch
  • 每个 get() 后必须配对 task_done(),哪怕出错也要调,否则 queue.join() 永远不结束
  • 不要在 worker 里用 await queue.put(...) 回填重试任务——容易引发循环等待,应由独立的 retry manager 处理

需要持久化或扩缩容?绕过内存队列直连 Redis 的最小可行方案

当爬虫要跑几天、或需横向加机器时,内存队列不可靠。用 redis-pylpop/rpush 组合比引入 Celery 更轻,且天然支持失败重入队。

import redis
import json

r = redis.Redis()

def add_task(url: str, priority: int = 0): payload = json.dumps({"url": url, "retry": 0}) r.zadd("pending_tasks", {payload: priority}) # 用有序集合支持优先级

def get_task(timeout=1) -> dict | None:

阻塞式取一个,超时返回 None

result = r.bzpopmin("pending_tasks", timeout=timeout)
if result:
    return json.loads(result[1])
return None
  • 别用 list 类型的 lpop —— 无法去重、不支持优先级、无超时原语;zsetstream 更稳妥
  • bzpopmin 是原子操作,避免“取到但崩溃未处理”导致任务丢失
  • 任务体里必须带 retry 字段,失败时 r.zadd("pending_tasks", {payload: time.time() + 60}) 实现指数退避

调度器里最容易被忽略的「反压」信号:如何让生产者感知下游拥堵

很多爬虫把 URL 批量塞进队列就不管了,结果内存暴涨 OOM。真正的调度必须让生产者知道“慢点来”。

  • queue.qsize() 做阈值判断不可靠(多线程下非原子),改用 queue.full() + time.sleep() 组合
  • 异步场景下,在 put() 前加 if queue.qsize() > MAX_SIZE: await asyncio.sleep(0.1)
  • Redis 方案中,用 r.zcard("pending_tasks") 监控积压量,超过阈值则暂停解析新页面链接

队列不是管道,是缓冲区;缓冲区满了还硬塞,系统就从调度问题变成运维事故。