Python 异步编程与协程实战完全指南:从入门到精通
Python 异步编程是提升程序性能的关键技术。通过 async/await 语法和 asyncio 库,开发者可以轻松实现高并发处理,特别适合 I/O 密集型任务。本文将系统讲解协程原理、事件循环机制、任务调度等核心知识,并通过爬虫、Web 服务等实战案例帮助读者掌握异步编程精髓。
一、为什么需要异步编程?
在传统同步编程中,当程序执行 I/O 操作(如网络请求、文件读写)时,整个线程会被阻塞,等待操作完成。这在处理大量并发请求时效率极低。
异步编程的核心思想是:当遇到 I/O 等待时,不阻塞线程,而是切换去执行其他任务。这样可以充分利用 CPU 资源,大幅提升程序吞吐量。
1.1 同步 vs 异步对比
import timeimport asyncio# 同步方式:顺序执行,总耗时约 6 秒def sync_task(name, delay): print(f"任务 {name} 开始") time.sleep(delay) # 阻塞等待 print(f"任务 {name} 完成") return f"结果-{name}"def sync_main(): start = time.time() sync_task("A", 2) sync_task("B", 2) sync_task("C", 2) print(f"同步总耗时: {time.time() - start:.2f}秒")# 异步方式:并发执行,总耗时约 2 秒async def async_task(name, delay): print(f"任务 {name} 开始") await asyncio.sleep(delay) # 非阻塞等待 print(f"任务 {name} 完成") return f"结果-{name}"async def async_main(): start = time.time() # 并发执行三个任务 results = await asyncio.gather( async_task("A", 2), async_task("B", 2), async_task("C", 2) ) print(f"异步总耗时: {time.time() - start:.2f}秒") print(f"结果: {results}")# 运行对比if __name__ == "__main__": sync_main() # 输出: 同步总耗时: 6.00秒 asyncio.run(async_main()) # 输出: 异步总耗时: 2.00秒1.2 异步编程的适用场景
- 网络爬虫:同时抓取多个网页,大幅提升效率
- Web 服务:处理大量并发请求,如 FastAPI、Sanic
- 数据库操作:异步 ORM,如 Tortoise-ORM、SQLAlchemy async
- 消息队列:异步消费和处理消息
- 实时通信:WebSocket、聊天服务器
二、协程基础:async/await 语法
2.1 定义协程函数
使用 async def 定义的函数称为协程函数,调用它会返回一个协程对象。
import asyncio# 定义协程函数async def say_hello(): print("Hello") await asyncio.sleep(1) # 模拟异步操作 print("World")# 调用协程函数返回协程对象coro = say_hello()print(type(coro)) # <class 'coroutine'># 运行协程asyncio.run(say_hello())2.2 await 关键字
await 用于挂起当前协程,等待另一个协程完成。在等待期间,事件循环会切换执行其他任务。
import asyncioasync def fetch_data(url): print(f"正在获取 {url}") await asyncio.sleep(1) # 模拟网络请求 return f"{url} 的数据"async def main(): # await 会等待协程完成并获取返回值 data1 = await fetch_data("https://api.example.com/users") data2 = await fetch_data("https://api.example.com/posts") print(data1) print(data2)asyncio.run(main())2.3 协程的注意点
import asyncioasync def my_coroutine(): await asyncio.sleep(1) return "完成"# 错误:直接调用不会执行result = my_coroutine() # 只返回协程对象,不执行# 正确:使用 await 或 asyncio.run()async def main(): result = await my_coroutine() print(result)asyncio.run(main())
三、asyncio 核心组件详解
3.1 事件循环(Event Loop)
事件循环是异步编程的核心,它负责调度和执行协程。可以把它理解为一个无限循环,不断检查是否有任务可以执行。
import asyncioasync def task(name, delay): print(f"{name} 开始") await asyncio.sleep(delay) print(f"{name} 结束") return f"{name} 结果"async def main(): # 获取当前事件循环 loop = asyncio.get_event_loop() print(f"事件循环: {loop}") # 创建任务并运行 task1 = asyncio.create_task(task("任务A", 1)) task2 = asyncio.create_task(task("任务B", 2)) # 等待所有任务完成 results = await asyncio.gather(task1, task2) print(f"所有任务完成: {results}")asyncio.run(main())3.2 Task 任务对象
Task 是协程的包装器,让协程可以在事件循环中并发执行。
import asyncioasync def countdown(name, n): for i in range(n, 0, -1): print(f"{name}: {i}") await asyncio.sleep(0.5) print(f"{name} 发射!")async def main(): # 方式1:create_task 创建任务(推荐) task1 = asyncio.create_task(countdown("火箭A", 5)) task2 = asyncio.create_task(countdown("火箭B", 3)) # 方式2:ensure_future 创建任务 task3 = asyncio.ensure_future(countdown("火箭C", 4)) # 等待所有任务完成 await asyncio.gather(task1, task2, task3)asyncio.run(main())3.3 任务控制:取消、超时、回调
import asyncioasync def long_running_task(): try: print("开始长时间任务...") await asyncio.sleep(10) print("任务完成") except asyncio.CancelledError: print("任务被取消") raiseasync def main(): # 创建任务 task = asyncio.create_task(long_running_task()) # 设置超时:2秒后取消 try: result = await asyncio.wait_for(task, timeout=2.0) except asyncio.TimeoutError: print("任务超时")asyncio.run(main())3.4 gather vs wait:并发控制
import asyncioasync def job(n): await asyncio.sleep(1) return f"任务{n}完成"async def main(): tasks = [job(i) for i in range(5)] # gather:按顺序返回结果 results = await asyncio.gather(*tasks) print(f"gather 结果: {results}") # gather with return_exceptions:即使有异常也继续 async def might_fail(n): if n == 2: raise ValueError("故意失败") return f"任务{n}完成" tasks2 = [might_fail(i) for i in range(5)] results2 = await asyncio.gather(*tasks2, return_exceptions=True) print(f"包含异常的结果: {results2}")asyncio.run(main())四、实战案例:高性能异步爬虫
4.1 使用 aiohttp 实现并发爬虫
import asyncioimport aiohttpimport timefrom typing import List, Dictclass AsyncSpider: """异步爬虫类""" def __init__(self, max_concurrent: int = 10): self.max_concurrent = max_concurrent self.semaphore = asyncio.Semaphore(max_concurrent) async def fetch(self, session: aiohttp.ClientSession, url: str) -> Dict: """单个请求""" async with self.semaphore: # 限制并发数 try: async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response: html = await response.text() return { "url": url, "status": response.status, "length": len(html) } except Exception as e: return {"url": url, "error": str(e)} async def crawl(self, urls: List[str]) -> List[Dict]: """批量爬取""" connector = aiohttp.TCPConnector(limit=20) async with aiohttp.ClientSession(connector=connector) as session: tasks = [self.fetch(session, url) for url in urls] results = await asyncio.gather(*tasks) return resultsasync def main(): urls = [f"https://httpbin.org/delay/{i}" for i in range(1, 6)] spider = AsyncSpider(max_concurrent=5) start = time.time() results = await spider.crawl(urls) elapsed = time.time() - start print(f"爬取 {len(urls)} 个页面,耗时 {elapsed:.2f}秒")if __name__ == "__main__": asyncio.run(main())4.2 异步爬取 + 数据处理管道
import asyncioimport aiohttpimport aiofilesimport jsonasync def fetch_page(session: aiohttp.ClientSession, url: str) -> str: """获取页面内容""" async with session.get(url) as response: return await response.text()async def parse_articles(html: str) -> list: """解析文章""" await asyncio.sleep(0.1) # 模拟解析 return [{"title": f"文章{i}", "content": "内容..."} for i in range(5)]async def save_to_file(articles: list, filename: str): """异步保存到文件""" async with aiofiles.open(filename, "w", encoding="utf-8") as f: await f.write(json.dumps(articles, ensure_ascii=False, indent=2)) print(f"已保存 {len(articles)} 篇文章到 {filename}")async def pipeline(url: str, output_file: str): """数据处理管道""" async with aiohttp.ClientSession() as session: html = await fetch_page(session, url) articles = await parse_articles(html) await save_to_file(articles, output_file) return len(articles)# asyncio.run(pipeline("https://example.com", "output.json"))五、异步上下文管理器与迭代器
5.1 异步上下文管理器
import asyncioclass AsyncDatabase: """模拟异步数据库连接""" async def __aenter__(self): print("建立数据库连接...") await asyncio.sleep(0.5) return self async def __aexit__(self, exc_type, exc_val, exc_tb): print("关闭数据库连接...") await asyncio.sleep(0.2) async def query(self, sql: str): print(f"执行查询: {sql}") await asyncio.sleep(0.3) return [{"id": 1, "name": "张三"}]async def main(): async with AsyncDatabase() as db: result = await db.query("SELECT * FROM users") print(f"查询结果: {result}")asyncio.run(main())5.2 异步迭代器
import asyncioclass AsyncRange: """异步范围迭代器""" def __init__(self, count: int): self.count = count def __aiter__(self): self.index = 0 return self async def __anext__(self): if self.index >= self.count: raise StopAsyncIteration await asyncio.sleep(0.1) value = self.index self.index += 1 return valueasync def main(): # 使用 async for 迭代 async for i in AsyncRange(5): print(f"值: {i}") # 或使用异步推导式 result = [i async for i in AsyncRange(5)] print(f"列表: {result}")asyncio.run(main())六、常见问题与最佳实践
6.1 避免阻塞事件循环
import asyncioimport time# 错误:使用阻塞函数async def bad_example(): time.sleep(5) # 阻塞整个事件循环!# 正确:使用异步版本async def good_example(): await asyncio.sleep(5) # 非阻塞# 正确:将阻塞操作放入线程池async def run_blocking(): loop = asyncio.get_event_loop() result = await loop.run_in_executor(None, time.sleep, 5) return result
6.2 错误处理最佳实践
import asyncioasync def risky_task(n): """可能失败的任务""" if n == 3: raise ValueError(f"任务 {n} 失败") await asyncio.sleep(0.5) return f"任务 {n} 成功"async def main(): tasks = [risky_task(i) for i in range(1, 6)] # 捕获单个任务异常 for task in asyncio.as_completed(tasks): try: result = await task print(f"成功: {result}") except ValueError as e: print(f"失败: {e}")asyncio.run(main())总结
Python 异步编程是提升程序性能的重要技能,特别适合 I/O 密集型场景。本文核心要点:
- async/await 语法:使用 async def 定义协程,await 等待异步操作
- 事件循环:异步编程的核心调度器,通过 asyncio.run() 启动
- Task 任务:包装协程实现并发执行,使用 create_task() 创建
- 并发控制:gather() 按顺序返回结果,wait() 更灵活
- 最佳实践:避免阻塞事件循环,合理使用线程池处理 CPU 密集型任务
掌握异步编程后,可以轻松构建高性能爬虫、Web 服务、实时通信系统等应用。建议结合 FastAPI、aiohttp 等框架进行实战练习,深入理解异步编程的精髓。
本文链接:https://www.kkkliao.cn/?id=917 转载需授权!
版权声明:本文由廖万里的博客发布,如需转载请注明出处。



手机流量卡
免费领卡
号卡合伙人
产品服务
关于本站
