Python接口请求如何优化_批量请求与异常处理技巧【教学】

Python批量请求需用Session复用连接、ThreadPoolExecutor并发、分层捕获异常并统一收口结果。关键包括:连接池调优、线程安全复用Session、按错误类型重试、结构化存储成败结果。

Python调用接口时,批量请求和异常处理是提升稳定性和效率的关键。单纯用requests.get()逐个发请求,既慢又容易崩——尤其面对几十上百个URL时。核心思路就两条:并发控制 + 稳健容错。

用requests.Session复用连接,减少开销

每次新建requests.get()都会重建TCP连接、TLS握手,耗时明显。换成Session对象,能自动复用底层连接,尤其适合批量请求同一域名的场景。

说明:
- Session会缓存连接池,默认保持10个空闲连接;
- 同一Session发出的请求,若Host相同,大概率复用已有连接;
- 配合mount可为HTTP/HTTPS定制Adapter(比如设置最大连接数)。

建议写法:

import requests

session = requests.Session()

可选:调整连接池大小(避免TooManyRedirects或ConnectionPoolSizeError)

adapter = requests.adapters.HTTPAdapter(pool_connections=20, pool_maxsize=20) session.mount('http://', adapter) session.mount('https://', adapter)

urls = ['https://www./link/cf24f44a79866351337c1b317ffdc18d', 'https://www./link/abc58d2523df2aea708a509fbd201437'] for url in urls: try: resp = session.get(url, timeout=5) resp.raise_for_status() print(resp.json()) except requests.exceptions.RequestException as e: print(f"请求失败 {url}:{e}")

并发请求别硬上asyncio,先试试ThreadPoolExecutor

多数业务接口是IO密集型,用多线程比多进程更轻量,也比手写async更易维护。Python标准库concurrent.futures.ThreadPoolExecutor足够应对几百以内的并发量。

关键点:
- 控制max_workers(通常设为CPU核数×2~5,或根据目标服务器承载力调低);
- 每个worker内仍用Session,避免连接池竞争;
- 用as_completed实时获取结果,不阻塞等待全部完成。

示例结构:

from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_one(session, url): try: resp = session.get(url, timeout=8) resp.raise_for_status() return {'url': url, 'status': 'success', 'data': resp.json()} except Exception as e: return {'url': url, 'status': 'error', 'error': str(e)}

复用Session实例(注意:Session不是线程安全的,但用于GET基本无问题;如需绝对安全,可在每个worker里新建)

with ThreadPoolExecutor(max_workers=10) as executor: futures = [executor.submit(fetch_one, session, url) for url in urls] for future in as_completed(futures): result = future.result() if result['status'] == 'success': print("✅", result['url']) else: print("❌", result['url'], result['error'])

异常要分层捕获,别只靠try-except包全局

接口请求失败原因多样,统一用一个except Exception掩盖细节,调试和重试策略都会失效。应按错误类型分层处理:

  • 网络层异常:如ConnectionErrorTimeout——适合立即重试(加退避);
  • 协议/响应异常:如HTTPError(4xx/5xx)——4xx一般不重试,5xx可考虑重试;
  • 解析异常:如JSONDecodeError——说明返回非预期格式(可能是HTML错误页),需记录原始resp.text排查;
  • 业务逻辑异常:如API返回{"code": 4001, "msg": "余额不足"}——属于正常业务流,不应进except,而应在response后判断字段。

推荐做法:封装一个带基础重试和分类日志的请求函数:

import time
import logging
from requests.exceptions import ConnectionError, Timeout, HTTPError

def safe_get(session, url, max_retries=2, backoff_factor=1): for i in range(max_retries + 1): try: resp = session.get(url, timeout=10) resp.raise_for_status() return resp except ConnectionError: if i == max_retries: raise logging.warning(f"连接失败 {url},{backoff_factor * (2 i)}s后重试") time.sleep(backoff_factor * (2 * i)) except Timeout: if i == max_retries: raise logging.warning(f"超时 {url},重试中...") time.sleep(backoff_factor (2 i)) except HTTPError as e: if resp.status_code >= 500 and i < max_retries: logging.warning(f"服务端错误 {url}({resp.status_code}),重试...") time.sleep(backoff_factor * (2 ** i)) continue raise # 4xx直接抛出

批量结果要统一收口,别让异常中断整个流程

批量请求的目标不是“全成功”,而是“可知可控”——哪怕100个里失败20个,也要明确知道哪20个、为什么失败、返回什么原始信息。

建议:
- 结果用字典或命名元组存储,含urlstatus('success'/'failed')、response(成功时为json dict,失败时为exception或原始resp);
- 失败项单独写入log文件或数据库,包含urltimestamperror_typeerror_msgresponse_text(如有);
- 最终汇总打印成功数/失败数/平均耗时,方便快速评估批次质量。

不复杂但容易忽略。