Python 异步编程完全指南:从入门到精通
异步编程是现代 Python 开发中提升并发性能的核心技术。掌握 async/await 语法,理解事件循环机制,能够让你的程序在 I/O 密集型场景下性能提升数倍甚至数十倍。
一、核心概念
在深入异步编程之前,我们需要理解几个关键概念:
1. 同步 vs 异步
同步编程是传统的执行方式,代码按顺序一行一行执行,遇到耗时操作(如网络请求、文件读写)时会阻塞,等待操作完成后才继续执行下一行。这就像在餐厅点餐,你必须等前一桌客人点完餐才能轮到你。
异步编程则不同,当遇到耗时操作时,程序不会傻等,而是先去执行其他任务,等耗时操作完成后再回来处理结果。这就像在快餐店点餐后拿个号码牌,你可以先找座位、玩手机,等叫号了再去取餐。
2. 协程(Coroutine)
协程是异步编程的核心概念。它是一种轻量级的"线程",由程序自身控制切换,而不是由操作系统调度。Python 使用 async/await 语法来定义和使用协程。
import asyncio
# 定义一个协程函数
async def say_hello():
print("Hello")
# 模拟耗时操作,暂停1秒
await asyncio.sleep(1)
print("World")
# 运行协程
asyncio.run(say_hello())
3. 事件循环(Event Loop)
事件循环是异步编程的"心脏"。它负责调度和执行协程,管理 I/O 事件,在合适的时机切换任务。可以把事件循环想象成一个不停旋转的轮子,每个任务就像轮子上的一个槽位,轮子转到哪个槽位就执行哪个任务。
二、基础语法详解
1. async 关键字
async 用于定义协程函数。被 async 修饰的函数调用后返回一个协程对象,而不是立即执行。
import asyncio
async def my_coroutine():
print("这是一个协程")
return "协程返回值"
# 调用协程函数,得到协程对象
coro = my_coroutine()
print(type(coro)) #
# 运行协程
result = asyncio.run(my_coroutine())
print(result) # 协程返回值
2. await 关键字
await 用于等待一个可等待对象(协程、Task、Future)完成,并获取其返回值。await 只能在 async 函数内部使用。
import asyncio
async def fetch_data(url):
# 模拟网络请求耗时
await asyncio.sleep(2)
return f"来自 {url} 的数据"
async def main():
# await 会暂停当前协程,直到 fetch_data 完成
data = await fetch_data("https://example.com")
print(data)
asyncio.run(main())
3. asyncio.run() 启动入口
asyncio.run() 是 Python 3.7+ 推荐的启动异步程序的方式。它会自动创建事件循环、运行协程、关闭事件循环。
import asyncio
async def main():
print("异步程序开始")
await asyncio.sleep(1)
print("异步程序结束")
# 推荐方式:自动管理事件循环
asyncio.run(main())
三、并发执行多个任务
异步编程的真正威力在于并发执行多个任务。当多个任务都需要等待 I/O 时,可以让它们同时进行,而不是一个接一个。
1. asyncio.gather() 并发执行
import asyncio
import time
async def download_file(name, delay):
print(f"开始下载 {name}")
await asyncio.sleep(delay) # 模拟下载耗时
print(f"下载完成 {name}")
return f"{name} 的内容"
async def main():
start = time.time()
# 并发执行3个下载任务
results = await asyncio.gather(
download_file("文件A", 2),
download_file("文件B", 3),
download_file("文件C", 1),
)
print(f"总耗时: {time.time() - start:.1f}秒")
print(f"结果: {results}")
asyncio.run(main())
# 输出:
# 开始下载 文件A
# 开始下载 文件B
# 开始下载 文件C
# 下载完成 文件C
# 下载完成 文件A
# 下载完成 文件B
# 总耗时: 3.0秒(而不是 2+3+1=6秒)
2. asyncio.create_task() 创建任务
import asyncio
async def background_task(name):
for i in range(5):
await asyncio.sleep(1)
print(f"{name}: 第 {i+1} 次执行")
async def main():
# 创建后台任务,立即返回,不等待完成
task1 = asyncio.create_task(background_task("任务1"))
task2 = asyncio.create_task(background_task("任务2"))
print("主任务开始")
await asyncio.sleep(3)
print("主任务结束")
# 等待所有任务完成
await asyncio.gather(task1, task2)
asyncio.run(main())
3. asyncio.wait() 灵活控制
import asyncio
async def task(name, delay):
await asyncio.sleep(delay)
return f"{name} 完成"
async def main():
tasks = [
asyncio.create_task(task("A", 1)),
asyncio.create_task(task("B", 2)),
asyncio.create_task(task("C", 3)),
]
# 等待第一个完成的任务
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.FIRST_COMPLETED
)
print(f"第一个完成的任务: {done.pop().result()}")
# 取消剩余任务
for t in pending:
t.cancel()
asyncio.run(main())
四、异步网络请求实战
异步编程最常见的场景是网络请求。使用 aiohttp 库可以高效地并发请求多个 URL。
import asyncio
import aiohttp
import time
async def fetch_url(session, url):
"""异步获取单个URL的内容"""
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
return await response.text()
except Exception as e:
return f"请求 {url} 失败: {e}"
async def fetch_all_urls(urls):
"""并发获取多个URL"""
async with aiohttp.ClientSession() as session:
tasks = [fetch_url(session, url) for url in urls]
results = await asyncio.gather(*tasks)
return results
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
"https://httpbin.org/get",
"https://httpbin.org/ip",
]
start = time.time()
results = await fetch_all_urls(urls)
for i, result in enumerate(results):
print(f"URL {i+1}: {len(result)} 字符")
print(f"总耗时: {time.time() - start:.2f}秒")
asyncio.run(main())
五、异步文件操作
使用 aiofiles 库可以进行异步文件读写,避免阻塞事件循环。
import asyncio
import aiofiles
async def write_file(filename, content):
"""异步写入文件"""
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(content)
print(f"已写入 {filename}")
async def read_file(filename):
"""异步读取文件"""
async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
content = await f.read()
return content
async def main():
# 并发写入多个文件
await asyncio.gather(
write_file("file1.txt", "内容1"),
write_file("file2.txt", "内容2"),
write_file("file3.txt", "内容3"),
)
# 并发读取
contents = await asyncio.gather(
read_file("file1.txt"),
read_file("file2.txt"),
read_file("file3.txt"),
)
print(contents)
asyncio.run(main())
六、异步上下文管理器与迭代器
1. 异步上下文管理器
import asyncio
class AsyncTimer:
"""异步计时器上下文管理器"""
async def __aenter__(self):
self.start = asyncio.get_event_loop().time()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
elapsed = asyncio.get_event_loop().time() - self.start
print(f"耗时: {elapsed:.2f}秒")
async def main():
async with AsyncTimer():
await asyncio.sleep(1)
print("执行中...")
asyncio.run(main())
2. 异步迭代器
import asyncio
class AsyncRange:
"""异步范围迭代器"""
def __init__(self, count):
self.count = count
def __aiter__(self):
self.i = 0
return self
async def __anext__(self):
if self.i >= self.count:
raise StopAsyncIteration
await asyncio.sleep(0.1) # 模拟异步操作
self.i += 1
return self.i
async def main():
async for num in AsyncRange(5):
print(num)
asyncio.run(main())
七、异步队列与生产者消费者模式
import asyncio
import random
async def producer(queue, producer_id):
"""生产者:向队列添加数据"""
for i in range(5):
item = f"生产者{producer_id}-商品{i}"
await queue.put(item)
print(f"[生产] {item}")
await asyncio.sleep(random.uniform(0.1, 0.5))
async def consumer(queue, consumer_id):
"""消费者:从队列获取数据"""
while True:
item = await queue.get()
print(f"[消费{consumer_id}] 处理: {item}")
await asyncio.sleep(random.uniform(0.2, 0.8))
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=10)
# 启动生产者
producers = [
asyncio.create_task(producer(queue, i))
for i in range(2)
]
# 启动消费者
consumers = [
asyncio.create_task(consumer(queue, i))
for i in range(3)
]
# 等待生产者完成
await asyncio.gather(*producers)
# 等待队列清空
await queue.join()
# 取消消费者
for c in consumers:
c.cancel()
asyncio.run(main())
八、常见陷阱与最佳实践
1. 避免阻塞事件循环
在异步函数中使用同步的阻塞操作(如 time.sleep、requests.get)会阻塞整个事件循环,导致所有异步任务都无法执行。
# 错误示范
import time
async def bad_example():
time.sleep(5) # 阻塞整个事件循环5秒!
# 正确做法
import asyncio
async def good_example():
await asyncio.sleep(5) # 不阻塞,允许其他任务运行
2. 正确处理异常
import asyncio
async def may_fail():
raise ValueError("出错了")
async def main():
try:
await may_fail()
except ValueError as e:
print(f"捕获异常: {e}")
# 或者使用 return_exceptions=True
results = await asyncio.gather(
may_fail(),
asyncio.sleep(1),
return_exceptions=True
)
print(results) # [ValueError('出错了'), None]
asyncio.run(main())
3. 限制并发数量
使用信号量(Semaphore)限制同时运行的任务数量,避免资源耗尽。
import asyncio
async def limited_task(semaphore, task_id):
async with semaphore:
print(f"任务 {task_id} 开始")
await asyncio.sleep(1)
print(f"任务 {task_id} 完成")
async def main():
# 最多同时运行3个任务
semaphore = asyncio.Semaphore(3)
tasks = [
limited_task(semaphore, i)
for i in range(10)
]
await asyncio.gather(*tasks)
asyncio.run(main())
九、性能对比实测
import asyncio
import time
import requests # 同步请求库
import aiohttp # 异步请求库
# 同步版本
def sync_fetch(urls):
start = time.time()
results = []
for url in urls:
response = requests.get(url)
results.append(len(response.text))
return time.time() - start
# 异步版本
async def async_fetch(urls):
start = time.time()
async with aiohttp.ClientSession() as session:
tasks = [session.get(url) for url in urls]
responses = await asyncio.gather(*tasks)
results = [len(await r.text()) for r in responses]
return time.time() - start
# 测试10个请求
urls = ["https://httpbin.org/delay/1"] * 10
# 同步:约10秒(串行执行)
# 异步:约1秒(并发执行)
十、总结
Python 异步编程是提升 I/O 密集型应用性能的利器。核心要点回顾:
1. 理解协程:async 定义协程,await 等待结果
2. 善用并发:asyncio.gather() 并发执行多个任务
3. 避免阻塞:使用异步版本的库,不要在异步函数中使用同步阻塞操作
4. 处理异常:正确捕获异常,使用 return_exceptions=True 获取所有结果
5. 控制并发:使用 Semaphore 限制并发数量,保护资源
异步编程的学习曲线稍陡,但一旦掌握,你将能够编写出高性能、高并发的 Python 应用。从爬虫、API 服务到实时通信,异步编程的应用场景非常广泛。
下一步建议:尝试将一个现有的同步项目改造为异步版本,在实战中巩固所学知识。
本文链接:https://www.kkkliao.cn/?id=899 转载需授权!
版权声明:本文由廖万里的博客发布,如需转载请注明出处。



手机流量卡
免费领卡
号卡合伙人
产品服务
关于本站
