异步爬虫是代理配置最容易翻车的地方。用 requests 时你一次轮换一个 IP,发完就过;一上 asyncio,瞬间几百个请求同时在飞,每个都要自己的出口、自己的会话粘性、被 403 时自己的重试。弄错了要么把一个 IP 打到封,要么在流程中途轮换把每个登录会话都撕掉。
这篇讲在高并发下真正扛得住的写法:httpx 和 aiohttp 的逐请求轮换、有上限的协程池、有状态流程的 sticky 会话、以及识别拦截的重试——都是可直接粘贴的代码。
同步爬虫一次只碰一个代理,所以「每个请求轮换」天然正确。异步一下打破三个前提:
httpx.AsyncClient 一个客户端绑一个代理,所以轮换是按请求选代理、走一个按出口分键的小客户端池:
import asyncio, itertools, httpx
PROXIES = [
"socks5h://USERNAME:[email protected]:913",
"socks5h://USERNAME:[email protected]:913",
"socks5h://USERNAME:[email protected]:913",
]
pool = itertools.cycle(PROXIES)
# 每个出口一个可复用客户端(连接池保持不变)
clients = {p: httpx.AsyncClient(proxy=p, timeout=30) for p in PROXIES}
async def fetch(url):
proxy = next(pool)
r = await clients[proxy].get(url)
return r.status_code, r.text
async def main(urls):
results = await asyncio.gather(*(fetch(u) for u in urls))
for client in clients.values():
await client.aclose()
return results
每个出口复用一个客户端,能让 HTTP/2 和连接池一直活着,而不是每个请求都付一次新握手。注意 httpx>=0.28 把客户端上的 proxies= 改名成了 proxy=。
aiohttp 这里更简单——一个 ClientSession 每个请求接 proxy= 参数,所以你内联轮换:
import asyncio, itertools, aiohttp
pool = itertools.cycle(PROXIES) # http:// 或 socks5h://(SOCKS 需 aiohttp-socks)
async def fetch(session, url):
proxy = next(pool)
async with session.get(url, proxy=proxy, timeout=aiohttp.ClientTimeout(total=30)) as r:
return r.status, await r.text()
async def main(urls):
async with aiohttp.ClientSession() as session:
return await asyncio.gather(*(fetch(session, u) for u in urls))
aiohttp 原生支持 HTTP 代理;要用 socks5h:// 就加 aiohttp-socks,传一个 ProxyConnector。
无上限的 gather 是把你池子里每个 IP 一次性打到被标记的最快方式。设个上限,让每个出口承载一个真人合理的并行数:
sem = asyncio.Semaphore(10) # 最多同时 10 个
async def fetch_capped(session, url):
async with sem:
return await fetch(session, url)
经验值:把并发压在你拥有的不同出口数量及以下,别在单个住宅 IP 上堆很多并行请求。
轮换是为了进门;sticky 会话是为了留下。登录、购物车、任何 cookie 绑定的流程都必须整段保持一个出口。按任务(而非按请求)固定代理:
async def run_account(account, proxy):
# 这个账号的每一步都走同一个出口 IP
async with httpx.AsyncClient(proxy=proxy, timeout=30) as client:
await client.post("https://target.site/login", data=account.creds)
await client.get("https://target.site/dashboard")
await client.post("https://target.site/cart", json=account.order)
async def main(accounts):
# 每个账号一条 sticky 出口,账号之间并发
await asyncio.gather(*(run_account(a, p)
for a, p in zip(accounts, itertools.cycle(PROXIES))))
用支持 sticky 住宅会话的服务商,让同一出口在任务生命周期内保持不被悄悄轮换。
高并发下你不能信 200——反爬系统会返回 200 但正文是质询页。识别拦截,把单个失败任务在新出口上重试:
BLOCK_MARKERS = ("just a moment", "/cdn-cgi/challenge-platform",
"datadome", "px-captcha", "access denied")
def looks_blocked(status, text):
return status in (403, 429, 503) or any(m in text[:4000].lower() for m in BLOCK_MARKERS)
async def fetch_retry(url, attempts=4):
for _ in range(attempts):
proxy = next(pool)
async with httpx.AsyncClient(proxy=proxy, timeout=30) as c:
r = await c.get(url)
if not looks_blocked(r.status_code, r.text):
return r
await asyncio.sleep(0.5)
raise RuntimeError(f"换 {attempts} 个出口仍被拦:{url}")
有一点异步解决不了:httpx 和 aiohttp 都坐在 Python 的 TLS 栈上,发的 JA3 没有真实浏览器会产生。完美的轮换池跑在干净住宅 IP 上,握手时照样被指纹识别成自动化。把异步轮换和 TLS 伪装搭配用——见 用 curl_cffi 绕过 TLS 指纹,并在 check.jibaoproxy.com 验你出口的 JA3 + ASN。
新用户注册即送500M免费流量,首次充值额外加赠,活动期间限时开放。