当前位置:首页 > 学习笔记 > 正文内容

Python 异步编程与协程实战完全指南:从入门到精通

Python 异步编程是提升程序性能的关键技术。通过 async/await 语法和 asyncio 库,开发者可以轻松实现高并发处理,特别适合 I/O 密集型任务。本文将系统讲解协程原理、事件循环机制、任务调度等核心知识,并通过爬虫、Web 服务等实战案例帮助读者掌握异步编程精髓。

一、为什么需要异步编程?

在传统同步编程中,当程序执行 I/O 操作(如网络请求、文件读写)时,整个线程会被阻塞,等待操作完成。这在处理大量并发请求时效率极低。

异步编程的核心思想是:当遇到 I/O 等待时,不阻塞线程,而是切换去执行其他任务。这样可以充分利用 CPU 资源,大幅提升程序吞吐量。

1.1 同步 vs 异步对比

import timeimport asyncio# 同步方式:顺序执行,总耗时约 6 秒def sync_task(name, delay):    print(f"任务 {name} 开始")    time.sleep(delay)  # 阻塞等待    print(f"任务 {name} 完成")    return f"结果-{name}"def sync_main():    start = time.time()    sync_task("A", 2)    sync_task("B", 2)    sync_task("C", 2)    print(f"同步总耗时: {time.time() - start:.2f}秒")# 异步方式:并发执行,总耗时约 2 秒async def async_task(name, delay):    print(f"任务 {name} 开始")    await asyncio.sleep(delay)  # 非阻塞等待    print(f"任务 {name} 完成")    return f"结果-{name}"async def async_main():    start = time.time()    # 并发执行三个任务    results = await asyncio.gather(        async_task("A", 2),        async_task("B", 2),        async_task("C", 2)    )    print(f"异步总耗时: {time.time() - start:.2f}秒")    print(f"结果: {results}")# 运行对比if __name__ == "__main__":    sync_main()      # 输出: 同步总耗时: 6.00秒    asyncio.run(async_main())  # 输出: 异步总耗时: 2.00秒

1.2 异步编程的适用场景

  • 网络爬虫:同时抓取多个网页,大幅提升效率
  • Web 服务:处理大量并发请求,如 FastAPI、Sanic
  • 数据库操作:异步 ORM,如 Tortoise-ORM、SQLAlchemy async
  • 消息队列:异步消费和处理消息
  • 实时通信:WebSocket、聊天服务器

二、协程基础:async/await 语法

2.1 定义协程函数

使用 async def 定义的函数称为协程函数,调用它会返回一个协程对象。

import asyncio# 定义协程函数async def say_hello():    print("Hello")    await asyncio.sleep(1)  # 模拟异步操作    print("World")# 调用协程函数返回协程对象coro = say_hello()print(type(coro))  # <class 'coroutine'># 运行协程asyncio.run(say_hello())

2.2 await 关键字

await 用于挂起当前协程,等待另一个协程完成。在等待期间,事件循环会切换执行其他任务。

import asyncioasync def fetch_data(url):    print(f"正在获取 {url}")    await asyncio.sleep(1)  # 模拟网络请求    return f"{url} 的数据"async def main():    # await 会等待协程完成并获取返回值    data1 = await fetch_data("https://api.example.com/users")    data2 = await fetch_data("https://api.example.com/posts")    print(data1)    print(data2)asyncio.run(main())

2.3 协程的注意点

import asyncioasync def my_coroutine():    await asyncio.sleep(1)    return "完成"# 错误:直接调用不会执行result = my_coroutine()  # 只返回协程对象,不执行# 正确:使用 await 或 asyncio.run()async def main():    result = await my_coroutine()    print(result)asyncio.run(main())

三、asyncio 核心组件详解

3.1 事件循环(Event Loop)

事件循环是异步编程的核心,它负责调度和执行协程。可以把它理解为一个无限循环,不断检查是否有任务可以执行。

import asyncioasync def task(name, delay):    print(f"{name} 开始")    await asyncio.sleep(delay)    print(f"{name} 结束")    return f"{name} 结果"async def main():    # 获取当前事件循环    loop = asyncio.get_event_loop()    print(f"事件循环: {loop}")        # 创建任务并运行    task1 = asyncio.create_task(task("任务A", 1))    task2 = asyncio.create_task(task("任务B", 2))        # 等待所有任务完成    results = await asyncio.gather(task1, task2)    print(f"所有任务完成: {results}")asyncio.run(main())

3.2 Task 任务对象

Task 是协程的包装器,让协程可以在事件循环中并发执行。

import asyncioasync def countdown(name, n):    for i in range(n, 0, -1):        print(f"{name}: {i}")        await asyncio.sleep(0.5)    print(f"{name} 发射!")async def main():    # 方式1:create_task 创建任务(推荐)    task1 = asyncio.create_task(countdown("火箭A", 5))    task2 = asyncio.create_task(countdown("火箭B", 3))        # 方式2:ensure_future 创建任务    task3 = asyncio.ensure_future(countdown("火箭C", 4))        # 等待所有任务完成    await asyncio.gather(task1, task2, task3)asyncio.run(main())

3.3 任务控制:取消、超时、回调

import asyncioasync def long_running_task():    try:        print("开始长时间任务...")        await asyncio.sleep(10)        print("任务完成")    except asyncio.CancelledError:        print("任务被取消")        raiseasync def main():    # 创建任务    task = asyncio.create_task(long_running_task())        # 设置超时:2秒后取消    try:        result = await asyncio.wait_for(task, timeout=2.0)    except asyncio.TimeoutError:        print("任务超时")asyncio.run(main())

3.4 gather vs wait:并发控制

import asyncioasync def job(n):    await asyncio.sleep(1)    return f"任务{n}完成"async def main():    tasks = [job(i) for i in range(5)]        # gather:按顺序返回结果    results = await asyncio.gather(*tasks)    print(f"gather 结果: {results}")        # gather with return_exceptions:即使有异常也继续    async def might_fail(n):        if n == 2:            raise ValueError("故意失败")        return f"任务{n}完成"        tasks2 = [might_fail(i) for i in range(5)]    results2 = await asyncio.gather(*tasks2, return_exceptions=True)    print(f"包含异常的结果: {results2}")asyncio.run(main())

四、实战案例:高性能异步爬虫

4.1 使用 aiohttp 实现并发爬虫

import asyncioimport aiohttpimport timefrom typing import List, Dictclass AsyncSpider:    """异步爬虫类"""        def __init__(self, max_concurrent: int = 10):        self.max_concurrent = max_concurrent        self.semaphore = asyncio.Semaphore(max_concurrent)        async def fetch(self, session: aiohttp.ClientSession, url: str) -> Dict:        """单个请求"""        async with self.semaphore:  # 限制并发数            try:                async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:                    html = await response.text()                    return {                        "url": url,                        "status": response.status,                        "length": len(html)                    }            except Exception as e:                return {"url": url, "error": str(e)}        async def crawl(self, urls: List[str]) -> List[Dict]:        """批量爬取"""        connector = aiohttp.TCPConnector(limit=20)                async with aiohttp.ClientSession(connector=connector) as session:            tasks = [self.fetch(session, url) for url in urls]            results = await asyncio.gather(*tasks)                return resultsasync def main():    urls = [f"https://httpbin.org/delay/{i}" for i in range(1, 6)]        spider = AsyncSpider(max_concurrent=5)        start = time.time()    results = await spider.crawl(urls)    elapsed = time.time() - start        print(f"爬取 {len(urls)} 个页面,耗时 {elapsed:.2f}秒")if __name__ == "__main__":    asyncio.run(main())

4.2 异步爬取 + 数据处理管道

import asyncioimport aiohttpimport aiofilesimport jsonasync def fetch_page(session: aiohttp.ClientSession, url: str) -> str:    """获取页面内容"""    async with session.get(url) as response:        return await response.text()async def parse_articles(html: str) -> list:    """解析文章"""    await asyncio.sleep(0.1)  # 模拟解析    return [{"title": f"文章{i}", "content": "内容..."} for i in range(5)]async def save_to_file(articles: list, filename: str):    """异步保存到文件"""    async with aiofiles.open(filename, "w", encoding="utf-8") as f:        await f.write(json.dumps(articles, ensure_ascii=False, indent=2))    print(f"已保存 {len(articles)} 篇文章到 {filename}")async def pipeline(url: str, output_file: str):    """数据处理管道"""    async with aiohttp.ClientSession() as session:        html = await fetch_page(session, url)        articles = await parse_articles(html)        await save_to_file(articles, output_file)        return len(articles)# asyncio.run(pipeline("https://example.com", "output.json"))

五、异步上下文管理器与迭代器

5.1 异步上下文管理器

import asyncioclass AsyncDatabase:    """模拟异步数据库连接"""        async def __aenter__(self):        print("建立数据库连接...")        await asyncio.sleep(0.5)        return self        async def __aexit__(self, exc_type, exc_val, exc_tb):        print("关闭数据库连接...")        await asyncio.sleep(0.2)        async def query(self, sql: str):        print(f"执行查询: {sql}")        await asyncio.sleep(0.3)        return [{"id": 1, "name": "张三"}]async def main():    async with AsyncDatabase() as db:        result = await db.query("SELECT * FROM users")        print(f"查询结果: {result}")asyncio.run(main())

5.2 异步迭代器

import asyncioclass AsyncRange:    """异步范围迭代器"""        def __init__(self, count: int):        self.count = count        def __aiter__(self):        self.index = 0        return self        async def __anext__(self):        if self.index >= self.count:            raise StopAsyncIteration                await asyncio.sleep(0.1)        value = self.index        self.index += 1        return valueasync def main():    # 使用 async for 迭代    async for i in AsyncRange(5):        print(f"值: {i}")        # 或使用异步推导式    result = [i async for i in AsyncRange(5)]    print(f"列表: {result}")asyncio.run(main())

六、常见问题与最佳实践

6.1 避免阻塞事件循环

import asyncioimport time# 错误:使用阻塞函数async def bad_example():    time.sleep(5)  # 阻塞整个事件循环!# 正确:使用异步版本async def good_example():    await asyncio.sleep(5)  # 非阻塞# 正确:将阻塞操作放入线程池async def run_blocking():    loop = asyncio.get_event_loop()    result = await loop.run_in_executor(None, time.sleep, 5)    return result

6.2 错误处理最佳实践

import asyncioasync def risky_task(n):    """可能失败的任务"""    if n == 3:        raise ValueError(f"任务 {n} 失败")    await asyncio.sleep(0.5)    return f"任务 {n} 成功"async def main():    tasks = [risky_task(i) for i in range(1, 6)]        # 捕获单个任务异常    for task in asyncio.as_completed(tasks):        try:            result = await task            print(f"成功: {result}")        except ValueError as e:            print(f"失败: {e}")asyncio.run(main())

总结

Python 异步编程是提升程序性能的重要技能,特别适合 I/O 密集型场景。本文核心要点:

  1. async/await 语法:使用 async def 定义协程,await 等待异步操作
  2. 事件循环:异步编程的核心调度器,通过 asyncio.run() 启动
  3. Task 任务:包装协程实现并发执行,使用 create_task() 创建
  4. 并发控制:gather() 按顺序返回结果,wait() 更灵活
  5. 最佳实践:避免阻塞事件循环,合理使用线程池处理 CPU 密集型任务

掌握异步编程后,可以轻松构建高性能爬虫、Web 服务、实时通信系统等应用。建议结合 FastAPI、aiohttp 等框架进行实战练习,深入理解异步编程的精髓。

本文链接:https://www.kkkliao.cn/?id=917 转载需授权!

分享到:

版权声明:本文由廖万里的博客发布,如需转载请注明出处。


“Python 异步编程与协程实战完全指南:从入门到精通” 的相关文章

发表评论

访客

看不清,换一张

◎欢迎参与讨论,请在这里发表您的看法和观点。