Python 进阶教程:异步编程 asyncio 详解

前言

同步代码执行时,CPU 经常处于”空闲等待”状态——等待网络响应、等待文件读取、等待数据库查询。传统的多线程/多进程虽然能解决这个问题,但带来了复杂性:线程安全、死锁、资源竞争。

asyncio 提供了第三种方案:单线程并发——通过事件循环在 I/O 等待时切换任务,实现高并发的同时保持代码的简洁性。

本文将系统讲解:

  • 协程的基础概念与语法
  • 事件循环的运行机制
  • asyncio 的核心 API
  • 并发控制的高级技巧
  • 适用场景与性能调优

异步编程核心概念

同步 vs 异步对比

# 同步代码:任务串行执行
def fetch_user_sync(user_id):
    response = requests.get(f"/api/users/{user_id}")  # 阻塞等待
    return response.json()
​
# 同步执行 3 个请求
for user_id in [1, 2, 3]:
    result = fetch_user_sync(user_id)
    print(result)
# 总时间 = 3 × 单请求时间
​
# 异步代码:任务并发执行
async def fetch_user_async(session, user_id):
    async with session.get(f"/api/users/{user_id}") as response:
        return await response.json()
​
# 异步执行 3 个请求
async def main():
    async with aiohttp.ClientSession() as session:
        tasks = [fetch_user_async(session, uid) for uid in [1, 2, 3]]
        results = await asyncio.gather(*tasks)
        print(results)
# 总时间 ≈ 1 × 单请求时间

核心概念图解

┌─────────────────────────────────────────────────────────────┐
│                      事件循环 (Event Loop)                   │
├─────────────────────────────────────────────────────────────┤
│                                                             │
│   时刻 T1           时刻 T2           时刻 T3              │
│      │                │                │                   │
│      ▼                ▼                ▼                   │
│   ┌──────┐         ┌──────┐         ┌──────┐               │
│   │Task A│         │Task B│         │Task A│               │
│   │等待IO│ ──切──> │执行中│ ──切──> │完成  │               │
│   └──────┘         └──────┘         └──────┘               │
│                      切换点              切换点               │
│                                                             │
│   CPU 在等待 I/O 时,事件循环切换执行其他任务                 │
└─────────────────────────────────────────────────────────────┘

关键术语

术语说明
协程 (Coroutine)可暂停/恢复执行的函数,用 async def 定义
事件循环 (Event Loop)管理协程调度的核心引擎
任务 (Task)协程的封装,包含执行状态
Future代表未来某个时刻会完成的操作
await等待协程完成的关键字
GIL全局解释器锁,asyncio 仍受其影响(CPU密集型)

asyncio 基础语法

async/await 基础

import asyncio
​
# 普通函数 → 协程函数
def sync_function():
    return "同步结果"
​
async def async_function():
    return "异步结果"
​
# 调用协程函数会返回协程对象,不会执行
coro = async_function()
print(coro)  # <coroutine object async_function at 0x...>
​
# 必须用 await 或 asyncio.run() 执行
async def main():
    result = await async_function()
    print(result)  # "异步结果"
​
asyncio.run(main())

asyncio.run() — 程序入口

import asyncio
​
# asyncio.run() 是 asyncio 程序的入口点
# 它会创建事件循环,运行协程,然后关闭循环
​
async def hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟异步等待
    print("World")
​
# 推荐写法
asyncio.run(hello())
​
# ❌ 不要这样做:混用事件循环
# asyncio.get_event_loop().run_until_complete(hello())

await 的作用

import asyncio
​
async def fetch_data():
    print("开始获取数据...")
    await asyncio.sleep(2)  # 模拟 I/O 等待(不阻塞其他协程)
    return {"data": "result"}
​
async def process():
    # await 暂停当前协程,等待 fetch_data 完成
    result = await fetch_data()
    print(f"获取到: {result}")
​
async def main():
    await process()
​
asyncio.run(main())
# 输出:
# 开始获取数据...
# (1秒后)
# 获取到: {'data': 'result'}

协程与任务

asyncio.create_task() — 并发执行

import asyncio
import time
​
async def task_a():
    await asyncio.sleep(2)
    return "A 完成"
​
async def task_b():
    await asyncio.sleep(1)
    return "B 完成"
​
async def main():
    # ❌ 串行执行
    # result_a = await task_a()  # 等待 2 秒
    # result_b = await task_b()  # 再等 1 秒
    # 总时间 = 3 秒
    
    # ✅ 并发执行
    task1 = asyncio.create_task(task_a())  # 立即开始
    task2 = asyncio.create_task(task_b())  # 立即开始
    
    # 同时等待两个任务
    result_a, result_b = await asyncio.gather(task1, task2)
    print(result_a, result_b)
    # 总时间 ≈ 2 秒(最长任务的时间)
​
start = time.time()
asyncio.run(main())
print(f"总耗时: {time.time() - start:.2f}s")

asyncio.gather() — 批量收集结果

import asyncio
​
async def fetch(url):
    await asyncio.sleep(1)
    return f"完成: {url}"
​
async def main():
    # 方案1:gather 收集结果
    results = await asyncio.gather(
        fetch("https://a.com"),
        fetch("https://b.com"),
        fetch("https://c.com"),
    )
    print(results)  # ['完成: https://a.com', '完成: https://b.com', '完成: https://c.com']
    
    # 方案2:gather 返回异常(不抛异常)
    results = await asyncio.gather(
        fetch("https://a.com"),
        fetch("https://b.com"),
        raise_error(),  # 会抛异常的协程
        return_exceptions=True  # 捕获异常作为返回值
    )
    print(results)  # ['完成: https://a.com', '完成: https://b.com', RuntimeError(...)]
    
    # 方案3:as_completed 逐个处理结果
    async def fetch_with_name(name):
        await asyncio.sleep(1)
        return name
    
    tasks = [fetch_with_name(name) for name in ["A", "B", "C"]]
    
    for completed in asyncio.as_completed(tasks):
        result = await completed
        print(f"完成: {result}")
​
asyncio.run(main())

asyncio.wait() — 条件等待

import asyncio
​
async def long_task(name, seconds):
    await asyncio.sleep(seconds)
    return f"{name} 完成"
​
async def main():
    # wait_until_complete:等待所有任务完成
    done, pending = await asyncio.wait([
        long_task("A", 3),
        long_task("B", 1),
        long_task("C", 2),
    ])
    
    for task in done:
        print(f"完成: {task.result()}")
    
    # wait_for:超时控制
    try:
        result = await asyncio.wait_for(long_task("D", 10), timeout=3)
        print(result)
    except asyncio.TimeoutError:
        print("任务超时!")
​
# wait 常用参数
async def with_timeout_control():
    # wait_first:返回第一个完成的任务
    first_done, _ = await asyncio.wait([
        long_task("A", 3),
        long_task("B", 1),
    ], return_when=asyncio.FIRST_COMPLETED)
    
    for task in first_done:
        print(f"最快完成: {task.result()}")

asyncio.sleep() vs time.sleep()

import asyncio
import time
​
async def demo():
    # ❌ time.sleep() 会阻塞整个线程
    # await time.sleep(1)  # 不支持!
    
    # ✅ asyncio.sleep() 暂停协程,不阻塞事件循环
    print("开始等待...")
    await asyncio.sleep(2)
    print("等待结束")
​
# 对比:两个任务各等待 2 秒
async def parallel_sleep():
    start = time.time()
    
    # 串行:2 + 2 = 4 秒
    # await asyncio.sleep(2)
    # await asyncio.sleep(2)
    
    # 并行:max(2, 2) = 2 秒
    await asyncio.gather(
        asyncio.sleep(2),
        asyncio.sleep(2),
    )
    
    print(f"并行等待耗时: {time.time() - start:.2f}s")
​
asyncio.run(parallel_sleep())

异步上下文管理器

async with — 异步资源管理

import asyncio
​
# 异步上下文管理器需要实现 __aenter__ 和 __aexit__
class AsyncResource:
    def __init__(self, name):
        self.name = name
    
    async def __aenter__(self):
        print(f"获取资源: {self.name}")
        await asyncio.sleep(0.1)  # 模拟连接建立
        return self
    
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print(f"释放资源: {self.name}")
        await asyncio.sleep(0.1)  # 模拟清理
        return False  # 不抑制异常
​
async def main():
    async with AsyncResource("数据库连接") as resource:
        print(f"使用资源: {resource.name}")
        await asyncio.sleep(1)
    # 自动清理
​
asyncio.run(main())
# 输出:
# 获取资源: 数据库连接
# 使用资源: 数据库连接
# 释放资源: 数据库连接

@contextlib.asynccontextmanager

import asyncio
import contextlib
​
@contextlib.asynccontextmanager
async def async_timer(name):
    """异步计时器上下文管理器"""
    print(f"[{name}] 开始")
    start = asyncio.get_event_loop().time()
    
    try:
        yield name
    finally:
        elapsed = asyncio.get_event_loop().time() - start
        print(f"[{name}] 耗时 {elapsed:.2f}s")
​
async def main():
    async with async_timer("任务A"):
        await asyncio.sleep(1)
    
    async with async_timer("任务B"):
        await asyncio.sleep(0.5)
​
asyncio.run(main())

异步队列 — 生产者/消费者

import asyncio
​
async def producer(queue):
    """生产者:生成数据"""
    for i in range(10):
        await queue.put(i)
        print(f"生产: {i}")
        await asyncio.sleep(0.1)
    await queue.put(None)  # 结束信号
​
async def consumer(queue, name):
    """消费者:处理数据"""
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f"[{name}] 消费: {item}")
        await asyncio.sleep(0.2)
        queue.task_done()
​
async def main():
    queue = asyncio.Queue()
    
    # 启动多个消费者
    await asyncio.gather(
        producer(queue),
        consumer(queue, "消费者1"),
        consumer(queue, "消费者2"),
    )
​
asyncio.run(main())

并发控制

信号量 — 控制并发数

import asyncio
​
# 场景:限制同时下载数量为 3
async def download(url, semaphore):
    async with semaphore:
        print(f"开始下载: {url}")
        await asyncio.sleep(2)  # 模拟下载
        print(f"完成下载: {url}")
        return url
​
async def main():
    semaphore = asyncio.Semaphore(3)  # 最多同时 3 个任务
    
    urls = [f"https://example.com/file{i}" for i in range(10)]
    
    tasks = [download(url, semaphore) for url in urls]
    await asyncio.gather(*tasks)
​
asyncio.run(main())

asyncio.Lock — 异步锁

import asyncio
​
# ❌ 普通锁在异步中无效
# lock = threading.Lock()  # 多线程锁
​
# ✅ 使用 asyncio.Lock
class AsyncCounter:
    def __init__(self):
        self.count = 0
        self.lock = asyncio.Lock()
    
    async def increment(self):
        async with self.lock:  # 保护临界区
            self.count += 1
            await asyncio.sleep(0.1)  # 模拟处理
            return self.count
​
async def main():
    counter = AsyncCounter()
    
    tasks = [counter.increment() for _ in range(10)]
    results = await asyncio.gather(*tasks)
    
    print(f"最终计数: {counter.count}")
    print(f"所有结果: {results}")
​
asyncio.run(main())  # 结果正确:10

asyncio.Event — 事件通知

import asyncio
​
async def waiter(event, name):
    print(f"[{name}] 等待事件...")
    await event.wait()  # 阻塞直到事件被设置
    print(f"[{name}] 事件触发!")
​
async def setter(event):
    await asyncio.sleep(2)
    print("设置事件")
    event.set()  # 触发事件
    await asyncio.sleep(1)
    event.clear()  # 可选:重置事件
​
async def main():
    event = asyncio.Event()
    
    await asyncio.gather(
        waiter(event, "等待者A"),
        waiter(event, "等待者B"),
        waiter(event, "等待者C"),
        setter(event),
    )
​
asyncio.run(main())
# 输出:
# [等待者A] 等待事件...
# [等待者B] 等待事件...
# [等待者C] 等待事件...
# (2秒后)
# 设置事件
# [等待者A] 事件触发!
# [等待者B] 事件触发!
# [等待者C] 事件触发!

asyncio.Condition — 条件变量

import asyncio
​
class AsyncBuffer:
    def __init__(self, max_size=3):
        self.buffer = []
        self.max_size = max_size
        self.condition = asyncio.Condition()
    
    async def produce(self, item):
        async with self.condition:
            while len(self.buffer) >= self.max_size:
                await self.condition.wait()  # 等待缓冲区不满
            self.buffer.append(item)
            print(f"生产: {item}")
            self.condition.notify_all()  # 通知消费者
    
    async def consume(self):
        async with self.condition:
            while len(self.buffer) == 0:
                await self.condition.wait()  # 等待有数据
            item = self.buffer.pop(0)
            print(f"消费: {item}")
            self.condition.notify_all()  # 通知生产者
            return item
​
async def main():
    buffer = AsyncBuffer()
    
    producers = [buffer.produce(i) for i in range(6)]
    consumers = [buffer.consume() for _ in range(6)]
    
    await asyncio.gather(*producers, *consumers)
​
asyncio.run(main())

异步迭代器与生成器

async for — 异步迭代

import asyncio
​
class AsyncCounter:
    """异步迭代器"""
    def __init__(self, limit):
        self.limit = limit
        self.current = 0
    
    def __aiter__(self):
        return self
    
    async def __anext__(self):
        if self.current >= self.limit:
            raise StopAsyncIteration
        await asyncio.sleep(0.1)
        value = self.current
        self.current += 1
        return value
​
async def main():
    async for i in AsyncCounter(5):
        print(f"计数: {i}")
​
asyncio.run(main())

async generator — 异步生成器

import asyncio
​
async def async_range(start, end, delay=0.1):
    """异步生成器"""
    for i in range(start, end):
        await asyncio.sleep(delay)
        yield i
​
async def main():
    # 使用 async for 遍历
    async for i in async_range(0, 5):
        print(f"异步生成: {i}")
    
    # 收集为列表
    items = [x async for x in async_range(0, 3)]
    print(f"收集结果: {items}")
​
asyncio.run(main())

异步字典推导式

import asyncio
​
async def fetch_price(symbol):
    await asyncio.sleep(0.1)
    return {"symbol": symbol, "price": 100}
​
async def main():
    symbols = ["AAPL", "GOOGL", "MSFT"]
    
    # 异步字典推导式
    prices = {s: await fetch_price(s) for s in symbols}
    print(prices)
    
    # 异步列表推导式
    results = [await fetch_price(s) for s in symbols]
    print(results)
​
asyncio.run(main())

实战场景

场景1:异步 HTTP 请求(aiohttp)

import asyncio
import aiohttp
​
async def fetch_all(urls):
    """并发获取多个 URL"""
    async with aiohttp.ClientSession() as session:
        async def fetch(url):
            async with session.get(url) as response:
                return await response.text()
        
        tasks = [fetch(url) for url in urls]
        return await asyncio.gather(*tasks)
​
async def fetch_with_retry(url, max_retries=3):
    """带重试的请求"""
    for attempt in range(max_retries):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url) as response:
                    if response.status == 200:
                        return await response.json()
                    raise aiohttp.ClientError(f"HTTP {response.status}")
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # 指数退避
​
async def main():
    urls = [
        "https://jsonplaceholder.typicode.com/posts/1",
        "https://jsonplaceholder.typicode.com/posts/2",
        "https://jsonplaceholder.typicode.com/posts/3",
    ]
    
    results = await fetch_all(urls)
    for i, result in enumerate(results):
        print(f"响应 {i+1}: {len(result)} 字符")
​
asyncio.run(main())

场景2:异步数据库操作(aiomysql)

import asyncio
import aiomysql
​
async def fetch_users(pool, user_ids):
    """并发查询用户"""
    async with pool.acquire() as conn:
        async with conn.cursor(aiomysql.DictCursor) as cur:
            # 构造 IN 查询
            placeholders = ','.join(['%s'] * len(user_ids))
            sql = f"SELECT * FROM users WHERE id IN ({placeholders})"
            await cur.execute(sql, user_ids)
            return await cur.fetchall()
​
async def batch_insert(pool, users):
    """批量插入用户"""
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            sql = "INSERT INTO users (name, email) VALUES (%s, %s)"
            await cur.executemany(sql, users)
            await conn.commit()
​
async def main():
    pool = await aiomysql.create_pool(
        host='localhost',
        port=3306,
        user='root',
        password='password',
        db='myapp',
        minsize=5,
        maxsize=10,
    )
    
    try:
        # 查询
        users = await fetch_users(pool, [1, 2, 3])
        print(f"查询到 {len(users)} 个用户")
        
        # 插入
        new_users = [
            ('Alice', 'alice@example.com'),
            ('Bob', 'bob@example.com'),
        ]
        await batch_insert(pool, new_users)
        
    finally:
        pool.close()
        await pool.wait_closed()
​
asyncio.run(main())

场景3:异步文件操作

import asyncio
import aiofiles
import aiofiles.os
​
async def read_file(path):
    """异步读取文件"""
    async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
        return await f.read()
​
async def write_file(path, content):
    """异步写入文件"""
    async with aiofiles.open(path, mode='w', encoding='utf-8') as f:
        await f.write(content)
​
async def batch_process_files(input_dir, output_dir):
    """批量异步处理文件"""
    # 获取所有文件
    files = await aiofiles.os.listdir(input_dir)
    
    async def process(file):
        content = await read_file(f"{input_dir}/{file}")
        processed = content.upper()  # 示例处理
        await write_file(f"{output_dir}/{file}", processed)
        return file
    
    # 并发处理(限制并发数为 10)
    semaphore = asyncio.Semaphore(10)
    
    async def limited_process(file):
        async with semaphore:
            return await process(file)
    
    tasks = [limited_process(f) for f in files if f.endswith('.txt')]
    completed = await asyncio.gather(*tasks)
    
    print(f"处理完成: {len(completed)} 个文件")
​
asyncio.run(main())

场景4:异步定时任务

import asyncio
from datetime import datetime
​
async def periodic_task(interval, callback):
    """周期性执行任务"""
    while True:
        await asyncio.sleep(interval)
        await callback()
​
async def job():
    print(f"[{datetime.now().strftime('%H:%M:%S')}] 执行定时任务")
​
async def main():
    # 每 5 秒执行一次
    task = asyncio.create_task(periodic_task(5, job))
    
    # 运行 20 秒后取消
    await asyncio.sleep(20)
    task.cancel()
    
    try:
        await task
    except asyncio.CancelledError:
        print("定时任务已取消")
​
asyncio.run(main())

异步 vs 多线程 vs 多进程

性能对比

特性asyncio多线程多进程
并发模型单线程协作式多线程抢占式多进程独立
GIL 影响CPU 密集型受限CPU 密集型受限不受 GIL 影响
I/O 效率⭐⭐⭐⭐⭐⭐⭐⭐⭐
CPU 密集型⭐⭐⭐⭐⭐
内存占用⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
复杂度⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐⭐
适用场景I/O 密集I/O + CPU 混合CPU 密集

选型指南

# ✅ asyncio 适用场景
# - HTTP API 调用(爬虫、聚合服务)
# - 数据库查询(大量读操作)
# - 文件 I/O(批量处理)
# - WebSocket 连接管理
# - 定时任务调度
​
# ✅ 多线程适用场景
# - 涉及同步库(无法异步化)
# - 需要真正的并行计算
# - CPU 密集 + I/O 密集混合
​
# ✅ 多进程适用场景
# - CPU 密集型计算
# - 绑定多核 CPU
# - 需要独立内存空间

混合使用示例

import asyncio
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
​
def cpu_intensive_task(data):
    """CPU 密集型任务(必须在进程池中运行)"""
    # 复杂计算
    return sum(i * i for i in range(data))
​
async def main():
    loop = asyncio.get_event_loop()
    
    # 创建进程池
    with ProcessPoolExecutor(max_workers=4) as pool:
        # 在进程池中运行 CPU 密集型任务
        tasks = [
            loop.run_in_executor(pool, cpu_intensive_task, i)
            for i in range(10)
        ]
        
        # 异步等待所有进程完成
        results = await asyncio.gather(*tasks)
        print(results)
​
asyncio.run(main())

排错指南

常见错误与解决方案

错误1:协程未执行

# ❌ 常见错误:协程对象创建后未执行
async def my_coro():
    return "结果"
​
result = my_coro()  # 创建协程对象,但没有执行
print(result)  # <coroutine object ...>
​
# ✅ 正确做法
async def main():
    result = await my_coro()
    print(result)
​
asyncio.run(main())
​
# 或使用 ensure_future
async def main():
    task = asyncio.create_task(my_coro())
    result = await task
    print(result)
​
asyncio.run(main())

错误2:死锁 — 在同步代码中 await

import asyncio
​
def sync_function():
    # ❌ 同步函数中不能使用 await
    result = await asyncio.sleep(1)  # SyntaxError!
    return result
​
# ✅ 正确:在异步函数中调用
async def async_caller():
    await asyncio.sleep(1)
    return "完成"
​
# 如果必须从同步代码调用异步函数
# 方案1:使用 asyncio.run
def wrapper():
    return asyncio.run(async_caller())
​
# 方案2:使用 run_in_executor
def wrapper():
    loop = asyncio.new_event_loop()
    return loop.run_until_complete(async_caller())
​
# 方案3:在已有事件循环中调度
def wrapper(loop):
    return loop.run_until_complete(async_caller())

错误3:忘记 await 导致静默失败

import asyncio
​
async def fetch_data():
    print("获取数据")
    await asyncio.sleep(1)
    return {"data": "value"}
​
async def main():
    # ❌ 忘记 await,结果丢失
    fetch_data()  # 协程被创建但从未执行
    print("完成")
    
    # ✅ 正确
    result = await fetch_data()
    print(f"结果: {result}")
​
asyncio.run(main())

错误4:异常未被捕获

import asyncio
​
async def risky_task():
    raise ValueError("任务失败")
​
async def main():
    # ❌ gather 默认会传播异常
    try:
        await asyncio.gather(
            risky_task(),
            asyncio.sleep(1),
        )
    except ValueError as e:
        print(f"捕获异常: {e}")
    
    # ✅ 使用 return_exceptions 捕获所有异常
    results = await asyncio.gather(
        risky_task(),
        asyncio.sleep(1),
        return_exceptions=True,
    )
    print(f"结果: {results}")  # [ValueError(...), None]
​
asyncio.run(main())

错误5:资源泄漏

import asyncio
​
async def bad_resource_management():
    # ❌ 每次请求都创建新 session,用完不关闭
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.json()
    # 如果上面抛异常,session 不会被清理
​
# ✅ 使用 try/finally 确保清理
async def good_resource_management(url):
    session = aiohttp.ClientSession()
    try:
        async with session.get(url) as resp:
            return await resp.json()
    finally:
        await session.close()
​
# ✅ 或使用上下文管理器(更简洁)
async def best_resource_management(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.json()

错误6:事件循环冲突

import asyncio
​
# ❌ 在已有事件循环的线程中创建新循环
import threading
​
def thread_task():
    loop = asyncio.new_event_loop()  # 创建新循环
    asyncio.set_event_loop(loop)    # 设置为主循环
    loop.run_until_complete(coro())
    loop.close()
​
# 主线程可能已经有事件循环,导致冲突
​
# ✅ 正确:在新线程中运行
def run_in_thread():
    asyncio.run(coro())  # asyncio.run 会创建新循环
​
# ✅ 或使用 get_running_loop
async def run_in_existing():
    loop = asyncio.get_running_loop()
    # 在已有循环中调度任务

调试技巧

import asyncio
​
# 1. 打印任务状态
async def debug_tasks():
    task = asyncio.create_task(asyncio.sleep(1))
    print(f"Task state: {task.get_name()} = {task.get_stack()}")
​
# 2. 所有任务完成后再退出
async def wait_all():
    pending = asyncio.all_tasks()
    await asyncio.gather(*pending, return_exceptions=True)
​
# 3. 捕获未处理的协程异常
async def main():
    loop = asyncio.get_event_loop()
    loop.set_exception_handler(lambda loop, context: print(f"异常: {context}"))
    
    asyncio.create_task(asyncio.sleep(1))
    asyncio.create_task(raise_error())  # 会触发异常处理器
​
# 4. 使用 asyncio.current_task() 获取当前任务
async def log_current():
    task = asyncio.current_task()
    print(f"当前任务: {task.get_name()}")
​
# 5. 性能分析
async def profile_async():
    import time
    start = time.perf_counter()
    
    # ... 异步代码 ...
    
    elapsed = time.perf_counter() - start
    print(f"耗时: {elapsed:.3f}s")

总结

核心要点

  1. async/await 是语法糖:协程是 asyncio 的基础,await 暂停协程而非阻塞线程
  2. 单线程并发:asyncio 是单线程协作式并发,不受 GIL 影响的场景才能发挥优势
  3. 适用 I/O 密集型:网络请求、文件 I/O、数据库查询
  4. 避免混用:不要在异步代码中调用同步阻塞函数

速查表

┌─────────────────────────────────────────────────────────────────┐
│                    asyncio 速查表                               │
├──────────────────┬──────────────────────────────────────────────┤
│     场景         │                      用法                    │
├──────────────────┼──────────────────────────────────────────────┤
│ 运行协程         │ asyncio.run(coro())                         │
│ 并发执行         │ asyncio.gather(*tasks)                      │
│ 创建任务         │ asyncio.create_task(coro())                 │
│ 超时控制         │ asyncio.wait_for(coro(), timeout=5)         │
│ 并发限制         │ asyncio.Semaphore(3)                        │
│ 异步锁           │ asyncio.Lock()                              │
│ 事件通知         │ asyncio.Event()                             │
│ 条件变量         │ asyncio.Condition()                         │
│ 异步迭代         │ async for item in async_iter:              │
│ 异步生成器       │ async def gen(): yield item                │
│ 异步上下文       │ async with AsyncResource() as r:           │
└──────────────────┴──────────────────────────────────────────────┘

最佳实践清单

检查项说明
✅ 使用 asyncio.run()程序入口,不要混用事件循环
await 所有协程避免创建协程后忘记执行
async with 管理资源确保资源正确释放
✅ 设置超时wait_for 防止无限等待
✅ 信号量限制并发避免资源耗尽
return_exceptions=Truegather 时捕获所有异常
✅ 隔离 CPU 密集型使用 run_in_executor

💡 提示: asyncio 适合 I/O 密集型并发,对于 CPU 密集型任务,考虑使用 concurrent.futures.ProcessPoolExecutor 或专门的并行计算库(如 multiprocessingjoblib)。 📚 扩展阅读:


原创内容,版权所有。未经授权,禁止转载。

© 版权声明
THE END
喜欢就支持一下吧
点赞8 分享
评论 抢沙发

请登录后发表评论

    暂无评论内容