本文目录导读:

我来为你详细讲解Python中async/await异步编程的使用方法,配合实际案例。
基础概念
什么是异步编程?
异步编程允许程序在等待I/O操作(如网络请求、文件读写)时,切换执行其他任务,提高程序效率。
基础案例
案例1:最基础的异步函数
import asyncio
# 定义异步函数
async def hello():
print("Hello")
await asyncio.sleep(1) # 模拟耗时操作
print("World")
# 运行异步函数
asyncio.run(hello())
案例2:多个异步任务并发执行
import asyncio
import time
async def fetch_data(delay, name):
"""模拟网络请求"""
print(f"开始获取{name}数据...")
await asyncio.sleep(delay) # 模拟网络延迟
print(f"{name}数据获取完成!")
return f"{name}的结果"
async def main():
# 方法1:使用asyncio.gather并发执行
start = time.time()
# 同时发起三个请求
results = await asyncio.gather(
fetch_data(2, "百度"),
fetch_data(3, "谷歌"),
fetch_data(1, "必应")
)
end = time.time()
print(f"结果: {results}")
print(f"总耗时: {end - start:.2f}秒")
# 运行
asyncio.run(main())
案例3:实际应用 - 批量下载图片
import asyncio
import aiohttp
import aiofiles
import time
async def download_image(session, url, file_name):
"""异步下载单张图片"""
try:
async with session.get(url) as response:
if response.status == 200:
# 异步写入文件
content = await response.read()
async with aiofiles.open(file_name, 'wb') as f:
await f.write(content)
print(f"下载完成: {file_name}")
return True
else:
print(f"下载失败: {url}")
return False
except Exception as e:
print(f"下载出错: {url}, 错误: {e}")
return False
async def download_all_images():
"""批量下载图片"""
urls = [
("https://example.com/image1.jpg", "image1.jpg"),
("https://example.com/image2.jpg", "image2.jpg"),
("https://example.com/image3.jpg", "image3.jpg"),
]
async with aiohttp.ClientSession() as session:
# 创建所有下载任务
tasks = [download_image(session, url, name) for url, name in urls]
# 并发执行所有任务
results = await asyncio.gather(*tasks)
success_count = sum(results)
print(f"总共下载: {success_count}/{len(urls)} 张图片")
# 运行
start = time.time()
asyncio.run(download_all_images())
print(f"总耗时: {time.time() - start:.2f}秒")
案例4:生产者-消费者模式
import asyncio
import random
async def producer(queue):
"""生产者:生成数据"""
for i in range(5):
# 模拟生产数据耗时
await asyncio.sleep(random.uniform(0.5, 1.5))
item = f"数据-{i}"
await queue.put(item)
print(f"生产者: 生产了 {item}")
# 发送结束信号
await queue.put(None)
async def consumer(queue, name):
"""消费者:处理数据"""
while True:
item = await queue.get()
# 检查结束信号
if item is None:
queue.task_done()
break
# 模拟处理数据耗时
await asyncio.sleep(random.uniform(0.2, 0.8))
print(f"消费者 {name}: 处理了 {item}")
queue.task_done()
async def main():
# 创建队列
queue = asyncio.Queue(maxsize=3)
# 创建生产者和消费者任务
producer_task = asyncio.create_task(producer(queue))
consumer_tasks = [
asyncio.create_task(consumer(queue, f"Worker-{i}"))
for i in range(2)
]
# 等待所有任务完成
await producer_task
# 等待消费者处理完所有数据
await queue.join()
# 取消消费者任务
for task in consumer_tasks:
task.cancel()
# 运行
asyncio.run(main())
案例5:超时控制和错误处理
import asyncio
async def slow_operation():
"""模拟耗时操作"""
await asyncio.sleep(5)
return "操作结果"
async def fast_operation():
"""模拟快速操作"""
await asyncio.sleep(1)
return "快速结果"
async def main():
# 方法1:使用asyncio.wait_for设置超时
try:
result = await asyncio.wait_for(
slow_operation(),
timeout=3 # 3秒超时
)
print(f"成功: {result}")
except asyncio.TimeoutError:
print("操作超时!")
# 方法2:使用asyncio.wait控制并发
tasks = [
slow_operation(),
fast_operation()
]
done, pending = await asyncio.wait(
tasks,
timeout=2, # 最多等待2秒
return_when=asyncio.FIRST_COMPLETED # 第一个完成就返回
)
# 取消未完成的任务
for task in pending:
task.cancel()
print(f"已完成的任务数: {len(done)}")
for task in done:
print(f"结果: {task.result()}")
# 运行
asyncio.run(main())
案例6:Web API并发请求(实际应用)
import asyncio
import aiohttp
import json
class AsyncAPI:
"""异步API请求类"""
def __init__(self, base_url):
self.base_url = base_url
self.session = None
async def __aenter__(self):
self.session = aiohttp.ClientSession()
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.session.close()
async def get_user(self, user_id):
"""获取用户信息"""
url = f"{self.base_url}/users/{user_id}"
async with self.session.get(url) as response:
if response.status == 200:
data = await response.json()
return {"user_id": user_id, "data": data}
else:
return {"user_id": user_id, "error": f"HTTP {response.status}"}
async def get_users_batch(self, user_ids):
"""批量获取用户信息"""
tasks = [self.get_user(uid) for uid in user_ids]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
async def main():
# 使用示例(这里使用JSONPlaceholder作为示例API)
async with AsyncAPI("https://jsonplaceholder.typicode.com") as api:
user_ids = [1, 2, 3, 4, 5]
print("开始批量获取用户信息...")
results = await api.get_users_batch(user_ids)
for result in results:
if isinstance(result, Exception):
print(f"请求失败: {result}")
else:
print(f"用户 {result['user_id']}: {result.get('data', {}).get('name', 'N/A')}")
# 安装依赖: pip install aiohttp aiofiles
asyncio.run(main())
高级技巧
异步上下文管理器
import asyncio
class AsyncResource:
"""异步资源管理器"""
async def __aenter__(self):
print("打开资源...")
await asyncio.sleep(1)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("关闭资源...")
await asyncio.sleep(0.5)
async def do_something(self):
print("执行操作...")
await asyncio.sleep(0.5)
return "完成"
async def main():
async with AsyncResource() as resource:
result = await resource.do_something()
print(f"结果: {result}")
asyncio.run(main())
异步迭代器
import asyncio
class AsyncCounter:
"""异步计数器"""
def __init__(self, limit):
self.limit = limit
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current >= self.limit:
raise StopAsyncIteration
await asyncio.sleep(0.5) # 模拟异步操作
self.current += 1
return self.current
async def main():
async for number in AsyncCounter(5):
print(f"计数: {number}")
asyncio.run(main())
常见错误处理
import asyncio
# 错误示例1:忘记await
async def bad_example():
result = asyncio.sleep(1) # 错误!没有await
return result # 返回coroutine对象,不是结果
# 正确示例
async def good_example():
await asyncio.sleep(1)
return "正确结果"
# 错误示例2:在同步函数中使用await
def sync_function():
# await asyncio.sleep(1) # 错误!不能在同步函数中使用await
pass
# 正确示例:使用asyncio.run包装
async def async_function():
await asyncio.sleep(1)
print("异步执行")
# 错误示例3:阻塞主线程
async def blocking_example():
import time
time.sleep(5) # 错误!会阻塞整个事件循环
return "完成"
# 正确示例:使用asyncio.sleep
async def non_blocking_example():
await asyncio.sleep(5)
return "完成"
async def main():
try:
result = await good_example()
print(f"成功: {result}")
except Exception as e:
print(f"错误: {e}")
# 运行
asyncio.run(main())
性能对比
import asyncio
import time
import random
# 同步版本
def sync_task(name, delay):
print(f"同步任务 {name} 开始")
time.sleep(delay)
print(f"同步任务 {name} 结束")
return f"任务 {name} 完成"
def sync_main():
start = time.time()
results = []
for i in range(3):
result = sync_task(i, random.uniform(1, 3))
results.append(result)
print(f"同步耗时: {time.time() - start:.2f}秒")
return results
# 异步版本
async def async_task(name, delay):
print(f"异步任务 {name} 开始")
await asyncio.sleep(delay)
print(f"异步任务 {name} 结束")
return f"任务 {name} 完成"
async def async_main():
start = time.time()
tasks = [
async_task(i, random.uniform(1, 3))
for i in range(3)
]
results = await asyncio.gather(*tasks)
print(f"异步耗时: {time.time() - start:.2f}秒")
return results
# 运行对比
print("同步版本:")
sync_results = sync_main()
print("\n异步版本:")
async_results = asyncio.run(async_main())
安装依赖
使用异步HTTP请求需要安装额外库:
pip install aiohttp aiofiles
- 适用场景:I/O密集型任务(网络请求、文件读写、数据库操作)
- 不适用场景:CPU密集型任务(计算、加密等)
- 关键语法:
async def、await、asyncio.run() - 常用函数:
asyncio.gather()、asyncio.create_task()、asyncio.wait()
异步编程的核心是await关键字,它告诉Python:"这里可能需要等待,先去执行其他任务"!