Python 异步编程入门教程:asyncio 与高并发实战
Python 异步编程是提升并发性能的关键技术,asyncio 库让单线程处理成千上万并发请求成为可能。本文从原理到实战,带你掌握异步编程的核心精髓。
一、为什么需要异步编程?
在传统的同步编程模型中,当程序执行 I/O 操作(如网络请求、文件读写、数据库查询)时,线程会被阻塞,等待操作完成才能继续执行。这种模式下,一个线程同一时间只能处理一个任务,效率低下。
假设你要请求 100 个网页:
# 同步方式:串行执行,总耗时 = 所有请求时间之和
import requests
import time
def fetch_sync(urls):
results = []
start = time.time()
for url in urls:
response = requests.get(url) # 阻塞等待
results.append(response.status_code)
print(f"同步耗时: {time.time() - start:.2f}秒")
return results
# 100 个请求,每个 1 秒,总耗时约 100 秒
而异步编程的核心思想是:当遇到 I/O 操作时,不等待其完成,而是转去执行其他任务。等 I/O 完成后再回来处理结果。这样单线程也能同时处理多个任务:
# 异步方式:并发执行,总耗时 ≈ 最慢的那个请求
import asyncio
import aiohttp
import time
async def fetch_async(urls):
results = []
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [fetch_one(session, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f"异步耗时: {time.time() - start:.2f}秒")
return results
async def fetch_one(session, url):
async with session.get(url) as response:
return response.status
# 100 个请求,每个 1 秒,总耗时约 1-2 秒!
这就是异步编程的威力:用单线程实现高并发,大大提升 I/O 密集型任务的效率。
二、asyncio 核心概念
2.1 协程(Coroutine)
协程是异步编程的基本单位。在 Python 中,使用 async def 定义的函数就是协程函数,调用它返回协程对象:
import asyncio
# 定义协程函数
async def say_hello():
print("Hello")
await asyncio.sleep(1) # 模拟 I/O 操作
print("World")
# 调用协程函数,返回协程对象(不会立即执行)
coro = say_hello()
print(type(coro)) # <class 'coroutine'>
# 运行协程
asyncio.run(say_hello())
2.2 await 关键字
await 用于等待一个协程完成,并获取其返回值。它只能在 async def 函数内部使用:
async def get_data():
await asyncio.sleep(1) # 等待模拟的 I/O 操作
return {"status": "ok", "data": [1, 2, 3]}
async def main():
# await 暂停当前协程,等待 get_data 完成
result = await get_data()
print(result) # {"status": "ok", "data": [1, 2, 3]}
asyncio.run(main())
关键理解:await 不是阻塞,而是让出控制权。当协程执行到 await 时,它会暂停自己,事件循环转而去执行其他协程。
2.3 事件循环(Event Loop)
事件循环是异步编程的心脏,负责调度和执行协程。它维护一个任务队列,不断从队列中取出任务执行:
import asyncio
async def task(name, delay):
print(f"任务 {name} 开始")
await asyncio.sleep(delay)
print(f"任务 {name} 完成")
return f"结果-{name}"
async def main():
# 创建多个任务,并发执行
tasks = [
asyncio.create_task(task("A", 2)),
asyncio.create_task(task("B", 1)),
asyncio.create_task(task("C", 3)),
]
# 等待所有任务完成
results = await asyncio.gather(*tasks)
print(f"所有结果: {results}")
asyncio.run(main())
# 输出顺序:B -> A -> C(按完成时间,非启动顺序)
2.4 任务(Task)
Task 是协程的包装器,让协程可以在事件循环中调度执行:
async def my_coroutine():
await asyncio.sleep(1)
return "完成"
async def main():
# 方式1:create_task(推荐)
task1 = asyncio.create_task(my_coroutine())
# 方式2:ensure_future
task2 = asyncio.ensure_future(my_coroutine())
# 等待任务完成
result1 = await task1
result2 = await task2
print(result1, result2)
asyncio.run(main())
三、并发执行模式
3.1 gather:并行等待多个协程
asyncio.gather() 是最常用的并发执行方式,它会并行启动多个协程并等待全部完成:
import asyncio
import time
async def download_file(file_id):
print(f"开始下载文件 {file_id}")
await asyncio.sleep(1) # 模拟下载
print(f"文件 {file_id} 下载完成")
return f"文件{file_id}的内容"
async def main():
start = time.time()
# 并行下载 5 个文件
results = await asyncio.gather(
download_file(1),
download_file(2),
download_file(3),
download_file(4),
download_file(5),
)
print(f"总耗时: {time.time() - start:.2f}秒") # 约 1 秒
print(f"结果数量: {len(results)}")
asyncio.run(main())
3.2 as_completed:按完成顺序获取结果
当需要按完成顺序处理结果时,使用 asyncio.as_completed():
import asyncio
import random
async def fetch_data(source):
delay = random.uniform(0.5, 2)
await asyncio.sleep(delay)
return f"数据来自 {source},耗时 {delay:.2f}秒"
async def main():
sources = ["API-1", "API-2", "API-3", "API-4"]
# 创建任务列表
tasks = [asyncio.create_task(fetch_data(s)) for s in sources]
# 按完成顺序处理结果
for coro in asyncio.as_completed(tasks):
result = await coro
print(f"收到: {result}")
asyncio.run(main())
3.3 wait:更灵活的等待控制
asyncio.wait() 提供更精细的控制,可以设置超时和返回条件:
import asyncio
async def long_task(n):
await asyncio.sleep(n)
return f"任务耗时 {n} 秒"
async def main():
tasks = {
asyncio.create_task(long_task(1)),
asyncio.create_task(long_task(2)),
asyncio.create_task(long_task(5)),
}
# 等待第一个完成就返回
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"已完成: {len(done)}")
print(f"进行中: {len(pending)}")
# 取消未完成的任务
for task in pending:
task.cancel()
asyncio.run(main())
3.4 信号量控制并发数
当需要限制并发数量(如 API 有频率限制)时,使用 Semaphore:
import asyncio
import aiohttp
async def fetch_with_limit(semaphore, session, url):
async with semaphore: # 获取信号量,限制并发
async with session.get(url) as response:
return await response.text()
async def main():
# 限制最多 5 个并发请求
semaphore = asyncio.Semaphore(5)
urls = [f"https://httpbin.org/delay/1" for _ in range(20)]
async with aiohttp.ClientSession() as session:
tasks = [
fetch_with_limit(semaphore, session, url)
for url in urls
]
results = await asyncio.gather(*tasks)
print(f"完成 {len(results)} 个请求")
asyncio.run(main())
四、实战案例:异步爬虫
下面是一个完整的异步爬虫示例,展示如何高效爬取多个网页:
import asyncio
import aiohttp
import time
from typing import List, Dict
import json
class AsyncSpider:
"""异步爬虫类"""
def __init__(self, max_concurrent: int = 10):
self.semaphore = asyncio.Semaphore(max_concurrent)
self.results: List[Dict] = []
async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> Dict:
"""抓取单个页面"""
async with self.semaphore:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
html = await response.text()
return {
"url": url,
"status": response.status,
"length": len(html),
"success": True
}
except Exception as e:
return {
"url": url,
"error": str(e),
"success": False
}
async def crawl(self, urls: List[str]) -> List[Dict]:
"""并发爬取多个 URL"""
connector = aiohttp.TCPConnector(limit=20) # 连接池大小
async with aiohttp.ClientSession(connector=connector) as session:
tasks = [self.fetch_page(session, url) for url in urls]
self.results = await asyncio.gather(*tasks)
return self.results
def print_stats(self):
"""打印统计信息"""
success = sum(1 for r in self.results if r["success"])
failed = len(self.results) - success
total_size = sum(r.get("length", 0) for r in self.results if r["success"])
print(f"\n=== 爬取统计 ===")
print(f"成功: {success}, 失败: {failed}")
print(f"总数据量: {total_size / 1024:.2f} KB")
async def main():
# 测试 URL 列表
test_urls = [
"https://httpbin.org/get",
"https://httpbin.org/ip",
"https://httpbin.org/headers",
"https://httpbin.org/user-agent",
"https://httpbin.org/json",
] * 4 # 20 个请求
spider = AsyncSpider(max_concurrent=5)
start = time.time()
results = await spider.crawl(test_urls)
elapsed = time.time() - start
spider.print_stats()
print(f"总耗时: {elapsed:.2f}秒")
print(f"平均每请求: {elapsed/len(test_urls)*1000:.0f}毫秒")
if __name__ == "__main__":
asyncio.run(main())
五、异步上下文管理器与迭代器
5.1 异步上下文管理器
使用 async with 进入异步上下文管理器:
import asyncio
class AsyncDatabase:
"""模拟异步数据库连接"""
async def __aenter__(self):
print("建立数据库连接...")
await asyncio.sleep(0.1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("关闭数据库连接...")
await asyncio.sleep(0.1)
async def query(self, sql):
await asyncio.sleep(0.05)
return f"查询结果: {sql}"
async def main():
async with AsyncDatabase() as db:
result = await db.query("SELECT * FROM users")
print(result)
asyncio.run(main())
5.2 异步迭代器
使用 async for 遍历异步迭代器:
import asyncio
class AsyncRange:
"""异步范围迭代器"""
def __init__(self, count):
self.count = count
def __aiter__(self):
self.current = 0
return self
async def __anext__(self):
if self.current >= self.count:
raise StopAsyncIteration
await asyncio.sleep(0.1) # 模拟异步获取
value = self.current
self.current += 1
return value
async def main():
async for num in AsyncRange(5):
print(f"获取到: {num}")
asyncio.run(main())
六、常见陷阱与最佳实践
6.1 不要在异步代码中使用同步阻塞调用
# ❌ 错误:会阻塞事件循环
async def bad_example():
time.sleep(5) # 阻塞整个事件循环!
return "完成"
# ✅ 正确:使用异步版本
async def good_example():
await asyncio.sleep(5) # 不阻塞,让出控制权
return "完成"
6.2 CPU 密集型任务要用线程池
异步适合 I/O 密集型任务,CPU 密集型任务应放到线程池:
import asyncio
from concurrent.futures import ProcessPoolExecutor
def cpu_intensive(n):
"""CPU 密集型计算"""
return sum(i * i for i in range(n))
async def main():
# 在进程池中运行 CPU 密集型任务
loop = asyncio.get_event_loop()
with ProcessPoolExecutor() as executor:
result = await loop.run_in_executor(
executor, cpu_intensive, 10_000_000
)
print(f"计算结果: {result}")
asyncio.run(main())
6.3 正确处理异常
import asyncio
async def may_fail(n):
if n == 3:
raise ValueError(f"数字 {n} 不合法")
await asyncio.sleep(0.1)
return n * 2
async def main():
# 方式1:gather 时返回异常而非抛出
results = await asyncio.gather(
*[may_fail(i) for i in range(5)],
return_exceptions=True # 关键!
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务 {i} 失败: {result}")
else:
print(f"任务 {i} 成功: {result}")
asyncio.run(main())
七、总结
Python 异步编程的核心要点:
- 适用场景:I/O 密集型任务(网络请求、文件读写、数据库查询)
- 三大核心:协程(async def)、事件循环、任务(Task)
- 关键字:async 定义协程,await 让出控制权
- 并发工具:gather 并行执行、as_completed 按完成顺序处理、Semaphore 限制并发
- 避坑指南:不用阻塞调用、CPU 任务用线程池、正确处理异常
掌握异步编程,让你的 Python 程序性能提升 10 倍不再是梦。在实际项目中,结合 aiohttp、aiomysql、aioredis 等异步库,可以构建高性能的 Web 服务、爬虫和数据处理管道。
下一篇文章,我们将深入探讨 FastAPI 异步 Web 框架的实战应用,敬请期待!
本文链接:https://www.kkkliao.cn/?id=978 转载需授权!
版权声明:本文由廖万里的博客发布,如需转载请注明出处。



手机流量卡
免费领卡
号卡合伙人
产品服务
关于本站
