当前位置:首页 > 学习笔记 > 正文内容

Python 异步编程入门教程:asyncio 与高并发实战

廖万里17小时前学习笔记0

Python 异步编程是提升并发性能的关键技术,asyncio 库让单线程处理成千上万并发请求成为可能。本文从原理到实战,带你掌握异步编程的核心精髓。

一、为什么需要异步编程?

在传统的同步编程模型中,当程序执行 I/O 操作(如网络请求、文件读写、数据库查询)时,线程会被阻塞,等待操作完成才能继续执行。这种模式下,一个线程同一时间只能处理一个任务,效率低下。

假设你要请求 100 个网页:

# 同步方式:串行执行,总耗时 = 所有请求时间之和
import requests
import time

def fetch_sync(urls):
    results = []
    start = time.time()
    for url in urls:
        response = requests.get(url)  # 阻塞等待
        results.append(response.status_code)
    print(f"同步耗时: {time.time() - start:.2f}秒")
    return results

# 100 个请求,每个 1 秒,总耗时约 100 秒

而异步编程的核心思想是:当遇到 I/O 操作时,不等待其完成,而是转去执行其他任务。等 I/O 完成后再回来处理结果。这样单线程也能同时处理多个任务:

# 异步方式:并发执行,总耗时 ≈ 最慢的那个请求
import asyncio
import aiohttp
import time

async def fetch_async(urls):
    results = []
    start = time.time()
    
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_one(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
    
    print(f"异步耗时: {time.time() - start:.2f}秒")
    return results

async def fetch_one(session, url):
    async with session.get(url) as response:
        return response.status

# 100 个请求,每个 1 秒,总耗时约 1-2 秒!

这就是异步编程的威力:用单线程实现高并发,大大提升 I/O 密集型任务的效率

二、asyncio 核心概念

2.1 协程(Coroutine)

协程是异步编程的基本单位。在 Python 中,使用 async def 定义的函数就是协程函数,调用它返回协程对象:

import asyncio

# 定义协程函数
async def say_hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟 I/O 操作
    print("World")

# 调用协程函数,返回协程对象(不会立即执行)
coro = say_hello()
print(type(coro))  # <class 'coroutine'>

# 运行协程
asyncio.run(say_hello())

2.2 await 关键字

await 用于等待一个协程完成,并获取其返回值。它只能在 async def 函数内部使用:

async def get_data():
    await asyncio.sleep(1)  # 等待模拟的 I/O 操作
    return {"status": "ok", "data": [1, 2, 3]}

async def main():
    # await 暂停当前协程,等待 get_data 完成
    result = await get_data()
    print(result)  # {"status": "ok", "data": [1, 2, 3]}

asyncio.run(main())

关键理解await 不是阻塞,而是让出控制权。当协程执行到 await 时,它会暂停自己,事件循环转而去执行其他协程。

2.3 事件循环(Event Loop)

事件循环是异步编程的心脏,负责调度和执行协程。它维护一个任务队列,不断从队列中取出任务执行:

import asyncio

async def task(name, delay):
    print(f"任务 {name} 开始")
    await asyncio.sleep(delay)
    print(f"任务 {name} 完成")
    return f"结果-{name}"

async def main():
    # 创建多个任务,并发执行
    tasks = [
        asyncio.create_task(task("A", 2)),
        asyncio.create_task(task("B", 1)),
        asyncio.create_task(task("C", 3)),
    ]
    
    # 等待所有任务完成
    results = await asyncio.gather(*tasks)
    print(f"所有结果: {results}")

asyncio.run(main())

# 输出顺序:B -> A -> C(按完成时间,非启动顺序)

2.4 任务(Task)

Task 是协程的包装器,让协程可以在事件循环中调度执行:

async def my_coroutine():
    await asyncio.sleep(1)
    return "完成"

async def main():
    # 方式1:create_task(推荐)
    task1 = asyncio.create_task(my_coroutine())
    
    # 方式2:ensure_future
    task2 = asyncio.ensure_future(my_coroutine())
    
    # 等待任务完成
    result1 = await task1
    result2 = await task2
    
    print(result1, result2)

asyncio.run(main())

三、并发执行模式

3.1 gather:并行等待多个协程

asyncio.gather() 是最常用的并发执行方式,它会并行启动多个协程并等待全部完成:

import asyncio
import time

async def download_file(file_id):
    print(f"开始下载文件 {file_id}")
    await asyncio.sleep(1)  # 模拟下载
    print(f"文件 {file_id} 下载完成")
    return f"文件{file_id}的内容"

async def main():
    start = time.time()
    
    # 并行下载 5 个文件
    results = await asyncio.gather(
        download_file(1),
        download_file(2),
        download_file(3),
        download_file(4),
        download_file(5),
    )
    
    print(f"总耗时: {time.time() - start:.2f}秒")  # 约 1 秒
    print(f"结果数量: {len(results)}")

asyncio.run(main())

3.2 as_completed:按完成顺序获取结果

当需要按完成顺序处理结果时,使用 asyncio.as_completed()

import asyncio
import random

async def fetch_data(source):
    delay = random.uniform(0.5, 2)
    await asyncio.sleep(delay)
    return f"数据来自 {source},耗时 {delay:.2f}秒"

async def main():
    sources = ["API-1", "API-2", "API-3", "API-4"]
    
    # 创建任务列表
    tasks = [asyncio.create_task(fetch_data(s)) for s in sources]
    
    # 按完成顺序处理结果
    for coro in asyncio.as_completed(tasks):
        result = await coro
        print(f"收到: {result}")

asyncio.run(main())

3.3 wait:更灵活的等待控制

asyncio.wait() 提供更精细的控制,可以设置超时和返回条件:

import asyncio

async def long_task(n):
    await asyncio.sleep(n)
    return f"任务耗时 {n} 秒"

async def main():
    tasks = {
        asyncio.create_task(long_task(1)),
        asyncio.create_task(long_task(2)),
        asyncio.create_task(long_task(5)),
    }
    
    # 等待第一个完成就返回
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.FIRST_COMPLETED
    )
    
    print(f"已完成: {len(done)}")
    print(f"进行中: {len(pending)}")
    
    # 取消未完成的任务
    for task in pending:
        task.cancel()

asyncio.run(main())

3.4 信号量控制并发数

当需要限制并发数量(如 API 有频率限制)时,使用 Semaphore

import asyncio
import aiohttp

async def fetch_with_limit(semaphore, session, url):
    async with semaphore:  # 获取信号量,限制并发
        async with session.get(url) as response:
            return await response.text()

async def main():
    # 限制最多 5 个并发请求
    semaphore = asyncio.Semaphore(5)
    
    urls = [f"https://httpbin.org/delay/1" for _ in range(20)]
    
    async with aiohttp.ClientSession() as session:
        tasks = [
            fetch_with_limit(semaphore, session, url) 
            for url in urls
        ]
        results = await asyncio.gather(*tasks)
        print(f"完成 {len(results)} 个请求")

asyncio.run(main())

四、实战案例:异步爬虫

下面是一个完整的异步爬虫示例,展示如何高效爬取多个网页:

import asyncio
import aiohttp
import time
from typing import List, Dict
import json

class AsyncSpider:
    """异步爬虫类"""
    
    def __init__(self, max_concurrent: int = 10):
        self.semaphore = asyncio.Semaphore(max_concurrent)
        self.results: List[Dict] = []
        
    async def fetch_page(self, session: aiohttp.ClientSession, url: str) -> Dict:
        """抓取单个页面"""
        async with self.semaphore:
            try:
                async with session.get(url, timeout=aiohttp.ClientTimeout(total=30)) as response:
                    html = await response.text()
                    return {
                        "url": url,
                        "status": response.status,
                        "length": len(html),
                        "success": True
                    }
            except Exception as e:
                return {
                    "url": url,
                    "error": str(e),
                    "success": False
                }
    
    async def crawl(self, urls: List[str]) -> List[Dict]:
        """并发爬取多个 URL"""
        connector = aiohttp.TCPConnector(limit=20)  # 连接池大小
        
        async with aiohttp.ClientSession(connector=connector) as session:
            tasks = [self.fetch_page(session, url) for url in urls]
            self.results = await asyncio.gather(*tasks)
        
        return self.results
    
    def print_stats(self):
        """打印统计信息"""
        success = sum(1 for r in self.results if r["success"])
        failed = len(self.results) - success
        total_size = sum(r.get("length", 0) for r in self.results if r["success"])
        
        print(f"\n=== 爬取统计 ===")
        print(f"成功: {success}, 失败: {failed}")
        print(f"总数据量: {total_size / 1024:.2f} KB")

async def main():
    # 测试 URL 列表
    test_urls = [
        "https://httpbin.org/get",
        "https://httpbin.org/ip",
        "https://httpbin.org/headers",
        "https://httpbin.org/user-agent",
        "https://httpbin.org/json",
    ] * 4  # 20 个请求
    
    spider = AsyncSpider(max_concurrent=5)
    
    start = time.time()
    results = await spider.crawl(test_urls)
    elapsed = time.time() - start
    
    spider.print_stats()
    print(f"总耗时: {elapsed:.2f}秒")
    print(f"平均每请求: {elapsed/len(test_urls)*1000:.0f}毫秒")

if __name__ == "__main__":
    asyncio.run(main())

五、异步上下文管理器与迭代器

5.1 异步上下文管理器

使用 async with 进入异步上下文管理器:

import asyncio

class AsyncDatabase:
    """模拟异步数据库连接"""
    
    async def __aenter__(self):
        print("建立数据库连接...")
        await asyncio.sleep(0.1)
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("关闭数据库连接...")
        await asyncio.sleep(0.1)
    
    async def query(self, sql):
        await asyncio.sleep(0.05)
        return f"查询结果: {sql}"

async def main():
    async with AsyncDatabase() as db:
        result = await db.query("SELECT * FROM users")
        print(result)

asyncio.run(main())

5.2 异步迭代器

使用 async for 遍历异步迭代器:

import asyncio

class AsyncRange:
    """异步范围迭代器"""
    
    def __init__(self, count):
        self.count = count
    
    def __aiter__(self):
        self.current = 0
        return self
    
    async def __anext__(self):
        if self.current >= self.count:
            raise StopAsyncIteration
        
        await asyncio.sleep(0.1)  # 模拟异步获取
        value = self.current
        self.current += 1
        return value

async def main():
    async for num in AsyncRange(5):
        print(f"获取到: {num}")

asyncio.run(main())

六、常见陷阱与最佳实践

6.1 不要在异步代码中使用同步阻塞调用

# ❌ 错误:会阻塞事件循环
async def bad_example():
    time.sleep(5)  # 阻塞整个事件循环!
    return "完成"

# ✅ 正确:使用异步版本
async def good_example():
    await asyncio.sleep(5)  # 不阻塞,让出控制权
    return "完成"

6.2 CPU 密集型任务要用线程池

异步适合 I/O 密集型任务,CPU 密集型任务应放到线程池:

import asyncio
from concurrent.futures import ProcessPoolExecutor

def cpu_intensive(n):
    """CPU 密集型计算"""
    return sum(i * i for i in range(n))

async def main():
    # 在进程池中运行 CPU 密集型任务
    loop = asyncio.get_event_loop()
    
    with ProcessPoolExecutor() as executor:
        result = await loop.run_in_executor(
            executor, cpu_intensive, 10_000_000
        )
    
    print(f"计算结果: {result}")

asyncio.run(main())

6.3 正确处理异常

import asyncio

async def may_fail(n):
    if n == 3:
        raise ValueError(f"数字 {n} 不合法")
    await asyncio.sleep(0.1)
    return n * 2

async def main():
    # 方式1:gather 时返回异常而非抛出
    results = await asyncio.gather(
        *[may_fail(i) for i in range(5)],
        return_exceptions=True  # 关键!
    )
    
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务 {i} 失败: {result}")
        else:
            print(f"任务 {i} 成功: {result}")

asyncio.run(main())

七、总结

Python 异步编程的核心要点:

  1. 适用场景:I/O 密集型任务(网络请求、文件读写、数据库查询)
  2. 三大核心:协程(async def)、事件循环、任务(Task)
  3. 关键字:async 定义协程,await 让出控制权
  4. 并发工具:gather 并行执行、as_completed 按完成顺序处理、Semaphore 限制并发
  5. 避坑指南:不用阻塞调用、CPU 任务用线程池、正确处理异常

掌握异步编程,让你的 Python 程序性能提升 10 倍不再是梦。在实际项目中,结合 aiohttp、aiomysql、aioredis 等异步库,可以构建高性能的 Web 服务、爬虫和数据处理管道。

下一篇文章,我们将深入探讨 FastAPI 异步 Web 框架的实战应用,敬请期待!

本文链接:https://www.kkkliao.cn/?id=978 转载需授权!

分享到:

版权声明:本文由廖万里的博客发布,如需转载请注明出处。


发表评论

访客

看不清,换一张

◎欢迎参与讨论,请在这里发表您的看法和观点。