前言
同步代码执行时,CPU 经常处于”空闲等待”状态——等待网络响应、等待文件读取、等待数据库查询。传统的多线程/多进程虽然能解决这个问题,但带来了复杂性:线程安全、死锁、资源竞争。
asyncio 提供了第三种方案:单线程并发——通过事件循环在 I/O 等待时切换任务,实现高并发的同时保持代码的简洁性。
本文将系统讲解:
- 协程的基础概念与语法
- 事件循环的运行机制
- asyncio 的核心 API
- 并发控制的高级技巧
- 适用场景与性能调优
异步编程核心概念
同步 vs 异步对比
# 同步代码:任务串行执行
def fetch_user_sync(user_id):
response = requests.get(f"/api/users/{user_id}") # 阻塞等待
return response.json()
# 同步执行 3 个请求
for user_id in [1, 2, 3]:
result = fetch_user_sync(user_id)
print(result)
# 总时间 = 3 × 单请求时间
# 异步代码:任务并发执行
async def fetch_user_async(session, user_id):
async with session.get(f"/api/users/{user_id}") as response:
return await response.json()
# 异步执行 3 个请求
async def main():
async with aiohttp.ClientSession() as session:
tasks = [fetch_user_async(session, uid) for uid in [1, 2, 3]]
results = await asyncio.gather(*tasks)
print(results)
# 总时间 ≈ 1 × 单请求时间
核心概念图解
┌─────────────────────────────────────────────────────────────┐
│ 事件循环 (Event Loop) │
├─────────────────────────────────────────────────────────────┤
│ │
│ 时刻 T1 时刻 T2 时刻 T3 │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────┐ ┌──────┐ ┌──────┐ │
│ │Task A│ │Task B│ │Task A│ │
│ │等待IO│ ──切──> │执行中│ ──切──> │完成 │ │
│ └──────┘ └──────┘ └──────┘ │
│ 切换点 切换点 │
│ │
│ CPU 在等待 I/O 时,事件循环切换执行其他任务 │
└─────────────────────────────────────────────────────────────┘
关键术语
| 术语 | 说明 |
|---|---|
| 协程 (Coroutine) | 可暂停/恢复执行的函数,用 async def 定义 |
| 事件循环 (Event Loop) | 管理协程调度的核心引擎 |
| 任务 (Task) | 协程的封装,包含执行状态 |
| Future | 代表未来某个时刻会完成的操作 |
| await | 等待协程完成的关键字 |
| GIL | 全局解释器锁,asyncio 仍受其影响(CPU密集型) |
asyncio 基础语法
async/await 基础
import asyncio
# 普通函数 → 协程函数
def sync_function():
return "同步结果"
async def async_function():
return "异步结果"
# 调用协程函数会返回协程对象,不会执行
coro = async_function()
print(coro) # <coroutine object async_function at 0x...>
# 必须用 await 或 asyncio.run() 执行
async def main():
result = await async_function()
print(result) # "异步结果"
asyncio.run(main())
asyncio.run() — 程序入口
import asyncio
# asyncio.run() 是 asyncio 程序的入口点
# 它会创建事件循环,运行协程,然后关闭循环
async def hello():
print("Hello")
await asyncio.sleep(1) # 模拟异步等待
print("World")
# 推荐写法
asyncio.run(hello())
# ❌ 不要这样做:混用事件循环
# asyncio.get_event_loop().run_until_complete(hello())
await 的作用
import asyncio
async def fetch_data():
print("开始获取数据...")
await asyncio.sleep(2) # 模拟 I/O 等待(不阻塞其他协程)
return {"data": "result"}
async def process():
# await 暂停当前协程,等待 fetch_data 完成
result = await fetch_data()
print(f"获取到: {result}")
async def main():
await process()
asyncio.run(main())
# 输出:
# 开始获取数据...
# (1秒后)
# 获取到: {'data': 'result'}
协程与任务
asyncio.create_task() — 并发执行
import asyncio
import time
async def task_a():
await asyncio.sleep(2)
return "A 完成"
async def task_b():
await asyncio.sleep(1)
return "B 完成"
async def main():
# ❌ 串行执行
# result_a = await task_a() # 等待 2 秒
# result_b = await task_b() # 再等 1 秒
# 总时间 = 3 秒
# ✅ 并发执行
task1 = asyncio.create_task(task_a()) # 立即开始
task2 = asyncio.create_task(task_b()) # 立即开始
# 同时等待两个任务
result_a, result_b = await asyncio.gather(task1, task2)
print(result_a, result_b)
# 总时间 ≈ 2 秒(最长任务的时间)
start = time.time()
asyncio.run(main())
print(f"总耗时: {time.time() - start:.2f}s")
asyncio.gather() — 批量收集结果
import asyncio
async def fetch(url):
await asyncio.sleep(1)
return f"完成: {url}"
async def main():
# 方案1:gather 收集结果
results = await asyncio.gather(
fetch("https://a.com"),
fetch("https://b.com"),
fetch("https://c.com"),
)
print(results) # ['完成: https://a.com', '完成: https://b.com', '完成: https://c.com']
# 方案2:gather 返回异常(不抛异常)
results = await asyncio.gather(
fetch("https://a.com"),
fetch("https://b.com"),
raise_error(), # 会抛异常的协程
return_exceptions=True # 捕获异常作为返回值
)
print(results) # ['完成: https://a.com', '完成: https://b.com', RuntimeError(...)]
# 方案3:as_completed 逐个处理结果
async def fetch_with_name(name):
await asyncio.sleep(1)
return name
tasks = [fetch_with_name(name) for name in ["A", "B", "C"]]
for completed in asyncio.as_completed(tasks):
result = await completed
print(f"完成: {result}")
asyncio.run(main())
asyncio.wait() — 条件等待
import asyncio
async def long_task(name, seconds):
await asyncio.sleep(seconds)
return f"{name} 完成"
async def main():
# wait_until_complete:等待所有任务完成
done, pending = await asyncio.wait([
long_task("A", 3),
long_task("B", 1),
long_task("C", 2),
])
for task in done:
print(f"完成: {task.result()}")
# wait_for:超时控制
try:
result = await asyncio.wait_for(long_task("D", 10), timeout=3)
print(result)
except asyncio.TimeoutError:
print("任务超时!")
# wait 常用参数
async def with_timeout_control():
# wait_first:返回第一个完成的任务
first_done, _ = await asyncio.wait([
long_task("A", 3),
long_task("B", 1),
], return_when=asyncio.FIRST_COMPLETED)
for task in first_done:
print(f"最快完成: {task.result()}")
asyncio.sleep() vs time.sleep()
import asyncio
import time
async def demo():
# ❌ time.sleep() 会阻塞整个线程
# await time.sleep(1) # 不支持!
# ✅ asyncio.sleep() 暂停协程,不阻塞事件循环
print("开始等待...")
await asyncio.sleep(2)
print("等待结束")
# 对比:两个任务各等待 2 秒
async def parallel_sleep():
start = time.time()
# 串行:2 + 2 = 4 秒
# await asyncio.sleep(2)
# await asyncio.sleep(2)
# 并行:max(2, 2) = 2 秒
await asyncio.gather(
asyncio.sleep(2),
asyncio.sleep(2),
)
print(f"并行等待耗时: {time.time() - start:.2f}s")
asyncio.run(parallel_sleep())
异步上下文管理器
async with — 异步资源管理
import asyncio
# 异步上下文管理器需要实现 __aenter__ 和 __aexit__
class AsyncResource:
def __init__(self, name):
self.name = name
async def __aenter__(self):
print(f"获取资源: {self.name}")
await asyncio.sleep(0.1) # 模拟连接建立
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print(f"释放资源: {self.name}")
await asyncio.sleep(0.1) # 模拟清理
return False # 不抑制异常
async def main():
async with AsyncResource("数据库连接") as resource:
print(f"使用资源: {resource.name}")
await asyncio.sleep(1)
# 自动清理
asyncio.run(main())
# 输出:
# 获取资源: 数据库连接
# 使用资源: 数据库连接
# 释放资源: 数据库连接
@contextlib.asynccontextmanager
import asyncio
import contextlib
@contextlib.asynccontextmanager
async def async_timer(name):
"""异步计时器上下文管理器"""
print(f"[{name}] 开始")
start = asyncio.get_event_loop().time()
try:
yield name
finally:
elapsed = asyncio.get_event_loop().time() - start
print(f"[{name}] 耗时 {elapsed:.2f}s")
async def main():
async with async_timer("任务A"):
await asyncio.sleep(1)
async with async_timer("任务B"):
await asyncio.sleep(0.5)
asyncio.run(main())
异步队列 — 生产者/消费者
import asyncio
async def producer(queue):
"""生产者:生成数据"""
for i in range(10):
await queue.put(i)
print(f"生产: {i}")
await asyncio.sleep(0.1)
await queue.put(None) # 结束信号
async def consumer(queue, name):
"""消费者:处理数据"""
while True:
item = await queue.get()
if item is None:
break
print(f"[{name}] 消费: {item}")
await asyncio.sleep(0.2)
queue.task_done()
async def main():
queue = asyncio.Queue()
# 启动多个消费者
await asyncio.gather(
producer(queue),
consumer(queue, "消费者1"),
consumer(queue, "消费者2"),
)
asyncio.run(main())
并发控制
信号量 — 控制并发数
import asyncio
# 场景:限制同时下载数量为 3
async def download(url, semaphore):
async with semaphore:
print(f"开始下载: {url}")
await asyncio.sleep(2) # 模拟下载
print(f"完成下载: {url}")
return url
async def main():
semaphore = asyncio.Semaphore(3) # 最多同时 3 个任务
urls = [f"https://example.com/file{i}" for i in range(10)]
tasks = [download(url, semaphore) for url in urls]
await asyncio.gather(*tasks)
asyncio.run(main())
asyncio.Lock — 异步锁
import asyncio
# ❌ 普通锁在异步中无效
# lock = threading.Lock() # 多线程锁
# ✅ 使用 asyncio.Lock
class AsyncCounter:
def __init__(self):
self.count = 0
self.lock = asyncio.Lock()
async def increment(self):
async with self.lock: # 保护临界区
self.count += 1
await asyncio.sleep(0.1) # 模拟处理
return self.count
async def main():
counter = AsyncCounter()
tasks = [counter.increment() for _ in range(10)]
results = await asyncio.gather(*tasks)
print(f"最终计数: {counter.count}")
print(f"所有结果: {results}")
asyncio.run(main()) # 结果正确:10
asyncio.Event — 事件通知
import asyncio
async def waiter(event, name):
print(f"[{name}] 等待事件...")
await event.wait() # 阻塞直到事件被设置
print(f"[{name}] 事件触发!")
async def setter(event):
await asyncio.sleep(2)
print("设置事件")
event.set() # 触发事件
await asyncio.sleep(1)
event.clear() # 可选:重置事件
async def main():
event = asyncio.Event()
await asyncio.gather(
waiter(event, "等待者A"),
waiter(event, "等待者B"),
waiter(event, "等待者C"),
setter(event),
)
asyncio.run(main())
# 输出:
# [等待者A] 等待事件...
# [等待者B] 等待事件...
# [等待者C] 等待事件...
# (2秒后)
# 设置事件
# [等待者A] 事件触发!
# [等待者B] 事件触发!
# [等待者C] 事件触发!
asyncio.Condition — 条件变量
import asyncio
class AsyncBuffer:
def __init__(self, max_size=3):
self.buffer = []
self.max_size = max_size
self.condition = asyncio.Condition()
async def produce(self, item):
async with self.condition:
while len(self.buffer) >= self.max_size:
await self.condition.wait() # 等待缓冲区不满
self.buffer.append(item)
print(f"生产: {item}")
self.condition.notify_all() # 通知消费者
async def consume(self):
async with self.condition:
while len(self.buffer) == 0:
await self.condition.wait() # 等待有数据
item = self.buffer.pop(0)
print(f"消费: {item}")
self.condition.notify_all() # 通知生产者
return item
async def main():
buffer = AsyncBuffer()
producers = [buffer.produce(i) for i in range(6)]
consumers = [buffer.consume() for _ in range(6)]
await asyncio.gather(*producers, *consumers)
asyncio.run(main())
异步迭代器与生成器
async for — 异步迭代
import asyncio
class AsyncCounter:
"""异步迭代器"""
def __init__(self, limit):
self.limit = limit
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.limit:
raise StopAsyncIteration
await asyncio.sleep(0.1)
value = self.current
self.current += 1
return value
async def main():
async for i in AsyncCounter(5):
print(f"计数: {i}")
asyncio.run(main())
async generator — 异步生成器
import asyncio
async def async_range(start, end, delay=0.1):
"""异步生成器"""
for i in range(start, end):
await asyncio.sleep(delay)
yield i
async def main():
# 使用 async for 遍历
async for i in async_range(0, 5):
print(f"异步生成: {i}")
# 收集为列表
items = [x async for x in async_range(0, 3)]
print(f"收集结果: {items}")
asyncio.run(main())
异步字典推导式
import asyncio
async def fetch_price(symbol):
await asyncio.sleep(0.1)
return {"symbol": symbol, "price": 100}
async def main():
symbols = ["AAPL", "GOOGL", "MSFT"]
# 异步字典推导式
prices = {s: await fetch_price(s) for s in symbols}
print(prices)
# 异步列表推导式
results = [await fetch_price(s) for s in symbols]
print(results)
asyncio.run(main())
实战场景
场景1:异步 HTTP 请求(aiohttp)
import asyncio
import aiohttp
async def fetch_all(urls):
"""并发获取多个 URL"""
async with aiohttp.ClientSession() as session:
async def fetch(url):
async with session.get(url) as response:
return await response.text()
tasks = [fetch(url) for url in urls]
return await asyncio.gather(*tasks)
async def fetch_with_retry(url, max_retries=3):
"""带重试的请求"""
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 200:
return await response.json()
raise aiohttp.ClientError(f"HTTP {response.status}")
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # 指数退避
async def main():
urls = [
"https://jsonplaceholder.typicode.com/posts/1",
"https://jsonplaceholder.typicode.com/posts/2",
"https://jsonplaceholder.typicode.com/posts/3",
]
results = await fetch_all(urls)
for i, result in enumerate(results):
print(f"响应 {i+1}: {len(result)} 字符")
asyncio.run(main())
场景2:异步数据库操作(aiomysql)
import asyncio
import aiomysql
async def fetch_users(pool, user_ids):
"""并发查询用户"""
async with pool.acquire() as conn:
async with conn.cursor(aiomysql.DictCursor) as cur:
# 构造 IN 查询
placeholders = ','.join(['%s'] * len(user_ids))
sql = f"SELECT * FROM users WHERE id IN ({placeholders})"
await cur.execute(sql, user_ids)
return await cur.fetchall()
async def batch_insert(pool, users):
"""批量插入用户"""
async with pool.acquire() as conn:
async with conn.cursor() as cur:
sql = "INSERT INTO users (name, email) VALUES (%s, %s)"
await cur.executemany(sql, users)
await conn.commit()
async def main():
pool = await aiomysql.create_pool(
host='localhost',
port=3306,
user='root',
password='password',
db='myapp',
minsize=5,
maxsize=10,
)
try:
# 查询
users = await fetch_users(pool, [1, 2, 3])
print(f"查询到 {len(users)} 个用户")
# 插入
new_users = [
('Alice', 'alice@example.com'),
('Bob', 'bob@example.com'),
]
await batch_insert(pool, new_users)
finally:
pool.close()
await pool.wait_closed()
asyncio.run(main())
场景3:异步文件操作
import asyncio
import aiofiles
import aiofiles.os
async def read_file(path):
"""异步读取文件"""
async with aiofiles.open(path, mode='r', encoding='utf-8') as f:
return await f.read()
async def write_file(path, content):
"""异步写入文件"""
async with aiofiles.open(path, mode='w', encoding='utf-8') as f:
await f.write(content)
async def batch_process_files(input_dir, output_dir):
"""批量异步处理文件"""
# 获取所有文件
files = await aiofiles.os.listdir(input_dir)
async def process(file):
content = await read_file(f"{input_dir}/{file}")
processed = content.upper() # 示例处理
await write_file(f"{output_dir}/{file}", processed)
return file
# 并发处理(限制并发数为 10)
semaphore = asyncio.Semaphore(10)
async def limited_process(file):
async with semaphore:
return await process(file)
tasks = [limited_process(f) for f in files if f.endswith('.txt')]
completed = await asyncio.gather(*tasks)
print(f"处理完成: {len(completed)} 个文件")
asyncio.run(main())
场景4:异步定时任务
import asyncio
from datetime import datetime
async def periodic_task(interval, callback):
"""周期性执行任务"""
while True:
await asyncio.sleep(interval)
await callback()
async def job():
print(f"[{datetime.now().strftime('%H:%M:%S')}] 执行定时任务")
async def main():
# 每 5 秒执行一次
task = asyncio.create_task(periodic_task(5, job))
# 运行 20 秒后取消
await asyncio.sleep(20)
task.cancel()
try:
await task
except asyncio.CancelledError:
print("定时任务已取消")
asyncio.run(main())
异步 vs 多线程 vs 多进程
性能对比
| 特性 | asyncio | 多线程 | 多进程 |
|---|---|---|---|
| 并发模型 | 单线程协作式 | 多线程抢占式 | 多进程独立 |
| GIL 影响 | CPU 密集型受限 | CPU 密集型受限 | 不受 GIL 影响 |
| I/O 效率 | ⭐⭐⭐⭐⭐ | ⭐⭐ | ⭐⭐ |
| CPU 密集型 | ⭐ | ⭐ | ⭐⭐⭐⭐⭐ |
| 内存占用 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | ⭐⭐ |
| 复杂度 | ⭐⭐⭐ | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ |
| 适用场景 | I/O 密集 | I/O + CPU 混合 | CPU 密集 |
选型指南
# ✅ asyncio 适用场景
# - HTTP API 调用(爬虫、聚合服务)
# - 数据库查询(大量读操作)
# - 文件 I/O(批量处理)
# - WebSocket 连接管理
# - 定时任务调度
# ✅ 多线程适用场景
# - 涉及同步库(无法异步化)
# - 需要真正的并行计算
# - CPU 密集 + I/O 密集混合
# ✅ 多进程适用场景
# - CPU 密集型计算
# - 绑定多核 CPU
# - 需要独立内存空间
混合使用示例
import asyncio
from concurrent.futures import ProcessPoolExecutor
import multiprocessing
def cpu_intensive_task(data):
"""CPU 密集型任务(必须在进程池中运行)"""
# 复杂计算
return sum(i * i for i in range(data))
async def main():
loop = asyncio.get_event_loop()
# 创建进程池
with ProcessPoolExecutor(max_workers=4) as pool:
# 在进程池中运行 CPU 密集型任务
tasks = [
loop.run_in_executor(pool, cpu_intensive_task, i)
for i in range(10)
]
# 异步等待所有进程完成
results = await asyncio.gather(*tasks)
print(results)
asyncio.run(main())
排错指南
常见错误与解决方案
错误1:协程未执行
# ❌ 常见错误:协程对象创建后未执行
async def my_coro():
return "结果"
result = my_coro() # 创建协程对象,但没有执行
print(result) # <coroutine object ...>
# ✅ 正确做法
async def main():
result = await my_coro()
print(result)
asyncio.run(main())
# 或使用 ensure_future
async def main():
task = asyncio.create_task(my_coro())
result = await task
print(result)
asyncio.run(main())
错误2:死锁 — 在同步代码中 await
import asyncio
def sync_function():
# ❌ 同步函数中不能使用 await
result = await asyncio.sleep(1) # SyntaxError!
return result
# ✅ 正确:在异步函数中调用
async def async_caller():
await asyncio.sleep(1)
return "完成"
# 如果必须从同步代码调用异步函数
# 方案1:使用 asyncio.run
def wrapper():
return asyncio.run(async_caller())
# 方案2:使用 run_in_executor
def wrapper():
loop = asyncio.new_event_loop()
return loop.run_until_complete(async_caller())
# 方案3:在已有事件循环中调度
def wrapper(loop):
return loop.run_until_complete(async_caller())
错误3:忘记 await 导致静默失败
import asyncio
async def fetch_data():
print("获取数据")
await asyncio.sleep(1)
return {"data": "value"}
async def main():
# ❌ 忘记 await,结果丢失
fetch_data() # 协程被创建但从未执行
print("完成")
# ✅ 正确
result = await fetch_data()
print(f"结果: {result}")
asyncio.run(main())
错误4:异常未被捕获
import asyncio
async def risky_task():
raise ValueError("任务失败")
async def main():
# ❌ gather 默认会传播异常
try:
await asyncio.gather(
risky_task(),
asyncio.sleep(1),
)
except ValueError as e:
print(f"捕获异常: {e}")
# ✅ 使用 return_exceptions 捕获所有异常
results = await asyncio.gather(
risky_task(),
asyncio.sleep(1),
return_exceptions=True,
)
print(f"结果: {results}") # [ValueError(...), None]
asyncio.run(main())
错误5:资源泄漏
import asyncio
async def bad_resource_management():
# ❌ 每次请求都创建新 session,用完不关闭
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
# 如果上面抛异常,session 不会被清理
# ✅ 使用 try/finally 确保清理
async def good_resource_management(url):
session = aiohttp.ClientSession()
try:
async with session.get(url) as resp:
return await resp.json()
finally:
await session.close()
# ✅ 或使用上下文管理器(更简洁)
async def best_resource_management(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as resp:
return await resp.json()
错误6:事件循环冲突
import asyncio
# ❌ 在已有事件循环的线程中创建新循环
import threading
def thread_task():
loop = asyncio.new_event_loop() # 创建新循环
asyncio.set_event_loop(loop) # 设置为主循环
loop.run_until_complete(coro())
loop.close()
# 主线程可能已经有事件循环,导致冲突
# ✅ 正确:在新线程中运行
def run_in_thread():
asyncio.run(coro()) # asyncio.run 会创建新循环
# ✅ 或使用 get_running_loop
async def run_in_existing():
loop = asyncio.get_running_loop()
# 在已有循环中调度任务
调试技巧
import asyncio
# 1. 打印任务状态
async def debug_tasks():
task = asyncio.create_task(asyncio.sleep(1))
print(f"Task state: {task.get_name()} = {task.get_stack()}")
# 2. 所有任务完成后再退出
async def wait_all():
pending = asyncio.all_tasks()
await asyncio.gather(*pending, return_exceptions=True)
# 3. 捕获未处理的协程异常
async def main():
loop = asyncio.get_event_loop()
loop.set_exception_handler(lambda loop, context: print(f"异常: {context}"))
asyncio.create_task(asyncio.sleep(1))
asyncio.create_task(raise_error()) # 会触发异常处理器
# 4. 使用 asyncio.current_task() 获取当前任务
async def log_current():
task = asyncio.current_task()
print(f"当前任务: {task.get_name()}")
# 5. 性能分析
async def profile_async():
import time
start = time.perf_counter()
# ... 异步代码 ...
elapsed = time.perf_counter() - start
print(f"耗时: {elapsed:.3f}s")
总结
核心要点
- async/await 是语法糖:协程是 asyncio 的基础,await 暂停协程而非阻塞线程
- 单线程并发:asyncio 是单线程协作式并发,不受 GIL 影响的场景才能发挥优势
- 适用 I/O 密集型:网络请求、文件 I/O、数据库查询
- 避免混用:不要在异步代码中调用同步阻塞函数
速查表
┌─────────────────────────────────────────────────────────────────┐
│ asyncio 速查表 │
├──────────────────┬──────────────────────────────────────────────┤
│ 场景 │ 用法 │
├──────────────────┼──────────────────────────────────────────────┤
│ 运行协程 │ asyncio.run(coro()) │
│ 并发执行 │ asyncio.gather(*tasks) │
│ 创建任务 │ asyncio.create_task(coro()) │
│ 超时控制 │ asyncio.wait_for(coro(), timeout=5) │
│ 并发限制 │ asyncio.Semaphore(3) │
│ 异步锁 │ asyncio.Lock() │
│ 事件通知 │ asyncio.Event() │
│ 条件变量 │ asyncio.Condition() │
│ 异步迭代 │ async for item in async_iter: │
│ 异步生成器 │ async def gen(): yield item │
│ 异步上下文 │ async with AsyncResource() as r: │
└──────────────────┴──────────────────────────────────────────────┘
最佳实践清单
| 检查项 | 说明 |
|---|---|
✅ 使用 asyncio.run() | 程序入口,不要混用事件循环 |
✅ await 所有协程 | 避免创建协程后忘记执行 |
✅ async with 管理资源 | 确保资源正确释放 |
| ✅ 设置超时 | wait_for 防止无限等待 |
| ✅ 信号量限制并发 | 避免资源耗尽 |
✅ return_exceptions=True | gather 时捕获所有异常 |
| ✅ 隔离 CPU 密集型 | 使用 run_in_executor |
💡 提示: asyncio 适合 I/O 密集型并发,对于 CPU 密集型任务,考虑使用
concurrent.futures.ProcessPoolExecutor或专门的并行计算库(如multiprocessing、joblib)。 📚 扩展阅读:
原创内容,版权所有。未经授权,禁止转载。
© 版权声明
文章版权归作者所有,未经允许请勿转载。
THE END













暂无评论内容