当前位置:首页 > 学习笔记 > 正文内容

Python 异步编程完全指南:从入门到精通

异步编程是现代 Python 开发中提升并发性能的核心技术。掌握 async/await 语法,理解事件循环机制,能够让你的程序在 I/O 密集型场景下性能提升数倍甚至数十倍。

一、核心概念

在深入异步编程之前,我们需要理解几个关键概念:

1. 同步 vs 异步

同步编程是传统的执行方式,代码按顺序一行一行执行,遇到耗时操作(如网络请求、文件读写)时会阻塞,等待操作完成后才继续执行下一行。这就像在餐厅点餐,你必须等前一桌客人点完餐才能轮到你。

异步编程则不同,当遇到耗时操作时,程序不会傻等,而是先去执行其他任务,等耗时操作完成后再回来处理结果。这就像在快餐店点餐后拿个号码牌,你可以先找座位、玩手机,等叫号了再去取餐。

2. 协程(Coroutine)

协程是异步编程的核心概念。它是一种轻量级的"线程",由程序自身控制切换,而不是由操作系统调度。Python 使用 async/await 语法来定义和使用协程。

import asyncio

# 定义一个协程函数
async def say_hello():
    print("Hello")
    # 模拟耗时操作,暂停1秒
    await asyncio.sleep(1)
    print("World")

# 运行协程
asyncio.run(say_hello())

3. 事件循环(Event Loop)

事件循环是异步编程的"心脏"。它负责调度和执行协程,管理 I/O 事件,在合适的时机切换任务。可以把事件循环想象成一个不停旋转的轮子,每个任务就像轮子上的一个槽位,轮子转到哪个槽位就执行哪个任务。

二、基础语法详解

1. async 关键字

async 用于定义协程函数。被 async 修饰的函数调用后返回一个协程对象,而不是立即执行。

import asyncio

async def my_coroutine():
    print("这是一个协程")
    return "协程返回值"

# 调用协程函数,得到协程对象
coro = my_coroutine()
print(type(coro))  # 

# 运行协程
result = asyncio.run(my_coroutine())
print(result)  # 协程返回值

2. await 关键字

await 用于等待一个可等待对象(协程、Task、Future)完成,并获取其返回值。await 只能在 async 函数内部使用。

import asyncio

async def fetch_data(url):
    # 模拟网络请求耗时
    await asyncio.sleep(2)
    return f"来自 {url} 的数据"

async def main():
    # await 会暂停当前协程,直到 fetch_data 完成
    data = await fetch_data("https://example.com")
    print(data)

asyncio.run(main())

3. asyncio.run() 启动入口

asyncio.run() 是 Python 3.7+ 推荐的启动异步程序的方式。它会自动创建事件循环、运行协程、关闭事件循环。

import asyncio

async def main():
    print("异步程序开始")
    await asyncio.sleep(1)
    print("异步程序结束")

# 推荐方式:自动管理事件循环
asyncio.run(main())

三、并发执行多个任务

异步编程的真正威力在于并发执行多个任务。当多个任务都需要等待 I/O 时,可以让它们同时进行,而不是一个接一个。

1. asyncio.gather() 并发执行

import asyncio
import time

async def download_file(name, delay):
    print(f"开始下载 {name}")
    await asyncio.sleep(delay)  # 模拟下载耗时
    print(f"下载完成 {name}")
    return f"{name} 的内容"

async def main():
    start = time.time()
    
    # 并发执行3个下载任务
    results = await asyncio.gather(
        download_file("文件A", 2),
        download_file("文件B", 3),
        download_file("文件C", 1),
    )
    
    print(f"总耗时: {time.time() - start:.1f}秒")
    print(f"结果: {results}")

asyncio.run(main())

# 输出:
# 开始下载 文件A
# 开始下载 文件B
# 开始下载 文件C
# 下载完成 文件C
# 下载完成 文件A
# 下载完成 文件B
# 总耗时: 3.0秒(而不是 2+3+1=6秒)

2. asyncio.create_task() 创建任务

import asyncio

async def background_task(name):
    for i in range(5):
        await asyncio.sleep(1)
        print(f"{name}: 第 {i+1} 次执行")

async def main():
    # 创建后台任务,立即返回,不等待完成
    task1 = asyncio.create_task(background_task("任务1"))
    task2 = asyncio.create_task(background_task("任务2"))
    
    print("主任务开始")
    await asyncio.sleep(3)
    print("主任务结束")
    
    # 等待所有任务完成
    await asyncio.gather(task1, task2)

asyncio.run(main())

3. asyncio.wait() 灵活控制

import asyncio

async def task(name, delay):
    await asyncio.sleep(delay)
    return f"{name} 完成"

async def main():
    tasks = [
        asyncio.create_task(task("A", 1)),
        asyncio.create_task(task("B", 2)),
        asyncio.create_task(task("C", 3)),
    ]
    
    # 等待第一个完成的任务
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    
    print(f"第一个完成的任务: {done.pop().result()}")
    
    # 取消剩余任务
    for t in pending:
        t.cancel()

asyncio.run(main())

四、异步网络请求实战

异步编程最常见的场景是网络请求。使用 aiohttp 库可以高效地并发请求多个 URL。

import asyncio
import aiohttp
import time

async def fetch_url(session, url):
    """异步获取单个URL的内容"""
    try:
        async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as response:
            return await response.text()
    except Exception as e:
        return f"请求 {url} 失败: {e}"

async def fetch_all_urls(urls):
    """并发获取多个URL"""
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results

async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/3",
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
    ]
    
    start = time.time()
    results = await fetch_all_urls(urls)
    
    for i, result in enumerate(results):
        print(f"URL {i+1}: {len(result)} 字符")
    
    print(f"总耗时: {time.time() - start:.2f}秒")

asyncio.run(main())

五、异步文件操作

使用 aiofiles 库可以进行异步文件读写,避免阻塞事件循环。

import asyncio
import aiofiles

async def write_file(filename, content):
    """异步写入文件"""
    async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
        await f.write(content)
    print(f"已写入 {filename}")

async def read_file(filename):
    """异步读取文件"""
    async with aiofiles.open(filename, 'r', encoding='utf-8') as f:
        content = await f.read()
    return content

async def main():
    # 并发写入多个文件
    await asyncio.gather(
        write_file("file1.txt", "内容1"),
        write_file("file2.txt", "内容2"),
        write_file("file3.txt", "内容3"),
    )
    
    # 并发读取
    contents = await asyncio.gather(
        read_file("file1.txt"),
        read_file("file2.txt"),
        read_file("file3.txt"),
    )
    
    print(contents)

asyncio.run(main())

六、异步上下文管理器与迭代器

1. 异步上下文管理器

import asyncio

class AsyncTimer:
    """异步计时器上下文管理器"""
    
    async def __aenter__(self):
        self.start = asyncio.get_event_loop().time()
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        elapsed = asyncio.get_event_loop().time() - self.start
        print(f"耗时: {elapsed:.2f}秒")

async def main():
    async with AsyncTimer():
        await asyncio.sleep(1)
        print("执行中...")

asyncio.run(main())

2. 异步迭代器

import asyncio

class AsyncRange:
    """异步范围迭代器"""
    
    def __init__(self, count):
        self.count = count
    
    def __aiter__(self):
        self.i = 0
        return self
    
    async def __anext__(self):
        if self.i >= self.count:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)  # 模拟异步操作
        self.i += 1
        return self.i

async def main():
    async for num in AsyncRange(5):
        print(num)

asyncio.run(main())

七、异步队列与生产者消费者模式

import asyncio
import random

async def producer(queue, producer_id):
    """生产者:向队列添加数据"""
    for i in range(5):
        item = f"生产者{producer_id}-商品{i}"
        await queue.put(item)
        print(f"[生产] {item}")
        await asyncio.sleep(random.uniform(0.1, 0.5))

async def consumer(queue, consumer_id):
    """消费者:从队列获取数据"""
    while True:
        item = await queue.get()
        print(f"[消费{consumer_id}] 处理: {item}")
        await asyncio.sleep(random.uniform(0.2, 0.8))
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=10)
    
    # 启动生产者
    producers = [
        asyncio.create_task(producer(queue, i))
        for i in range(2)
    ]
    
    # 启动消费者
    consumers = [
        asyncio.create_task(consumer(queue, i))
        for i in range(3)
    ]
    
    # 等待生产者完成
    await asyncio.gather(*producers)
    
    # 等待队列清空
    await queue.join()
    
    # 取消消费者
    for c in consumers:
        c.cancel()

asyncio.run(main())

八、常见陷阱与最佳实践

1. 避免阻塞事件循环

在异步函数中使用同步的阻塞操作(如 time.sleep、requests.get)会阻塞整个事件循环,导致所有异步任务都无法执行。

# 错误示范
import time

async def bad_example():
    time.sleep(5)  # 阻塞整个事件循环5秒!
    
# 正确做法
import asyncio

async def good_example():
    await asyncio.sleep(5)  # 不阻塞,允许其他任务运行

2. 正确处理异常

import asyncio

async def may_fail():
    raise ValueError("出错了")

async def main():
    try:
        await may_fail()
    except ValueError as e:
        print(f"捕获异常: {e}")
    
    # 或者使用 return_exceptions=True
    results = await asyncio.gather(
        may_fail(),
        asyncio.sleep(1),
        return_exceptions=True
    )
    print(results)  # [ValueError('出错了'), None]

asyncio.run(main())

3. 限制并发数量

使用信号量(Semaphore)限制同时运行的任务数量,避免资源耗尽。

import asyncio

async def limited_task(semaphore, task_id):
    async with semaphore:
        print(f"任务 {task_id} 开始")
        await asyncio.sleep(1)
        print(f"任务 {task_id} 完成")

async def main():
    # 最多同时运行3个任务
    semaphore = asyncio.Semaphore(3)
    
    tasks = [
        limited_task(semaphore, i)
        for i in range(10)
    ]
    
    await asyncio.gather(*tasks)

asyncio.run(main())

九、性能对比实测

import asyncio
import time
import requests  # 同步请求库
import aiohttp   # 异步请求库

# 同步版本
def sync_fetch(urls):
    start = time.time()
    results = []
    for url in urls:
        response = requests.get(url)
        results.append(len(response.text))
    return time.time() - start

# 异步版本
async def async_fetch(urls):
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [session.get(url) for url in urls]
        responses = await asyncio.gather(*tasks)
        results = [len(await r.text()) for r in responses]
    return time.time() - start

# 测试10个请求
urls = ["https://httpbin.org/delay/1"] * 10

# 同步:约10秒(串行执行)
# 异步:约1秒(并发执行)

十、总结

Python 异步编程是提升 I/O 密集型应用性能的利器。核心要点回顾:

1. 理解协程:async 定义协程,await 等待结果

2. 善用并发:asyncio.gather() 并发执行多个任务

3. 避免阻塞:使用异步版本的库,不要在异步函数中使用同步阻塞操作

4. 处理异常:正确捕获异常,使用 return_exceptions=True 获取所有结果

5. 控制并发:使用 Semaphore 限制并发数量,保护资源

异步编程的学习曲线稍陡,但一旦掌握,你将能够编写出高性能、高并发的 Python 应用。从爬虫、API 服务到实时通信,异步编程的应用场景非常广泛。

下一步建议:尝试将一个现有的同步项目改造为异步版本,在实战中巩固所学知识。

本文链接:https://www.kkkliao.cn/?id=899 转载需授权!

分享到:

版权声明:本文由廖万里的博客发布,如需转载请注明出处。


返回列表

上一篇:Docker 容器化部署完全指南

没有最新的文章了...

发表评论

访客

看不清,换一张

◎欢迎参与讨论,请在这里发表您的看法和观点。