本文目录导读:

- 使用 try-except 捕获协程中的错误
- 使用 asyncio.gather() 处理多个协程
- 使用 asyncio.wait() 处理任务集
- 使用 asyncio.Task 对象
- 超时处理
- 使用回调函数处理异常
- 完整的实际案例
- 关键要点:
在Python中处理异步操作的错误,主要有以下几种方式:
使用 try-except 捕获协程中的错误
import asyncio
async def risky_operation():
raise ValueError("某个值错误")
async def main():
try:
await risky_operation()
except ValueError as e:
print(f"捕获到错误: {e}")
asyncio.run(main())
使用 asyncio.gather() 处理多个协程
import asyncio
async def task1():
await asyncio.sleep(1)
return "任务1完成"
async def task2():
await asyncio.sleep(0.5)
raise RuntimeError("任务2错误")
async def main():
try:
results = await asyncio.gather(
task1(),
task2(),
return_exceptions=False # 遇到第一个错误就抛出
)
except RuntimeError as e:
print(f"捕获到错误: {e}")
# 使用 return_exceptions=True 收集所有结果(包括错误)
results = await asyncio.gather(
task1(),
task2(),
return_exceptions=True # 将异常作为结果返回
)
for i, result in enumerate(results):
if isinstance(result, Exception):
print(f"任务{i}出错: {result}")
else:
print(f"任务{i}成功: {result}")
asyncio.run(main())
使用 asyncio.wait() 处理任务集
import asyncio
async def worker(name, success=True):
await asyncio.sleep(1)
if not success:
raise Exception(f"{name} 失败")
return f"{name} 成功"
async def main():
tasks = [
worker("A", True),
worker("B", False),
worker("C", True)
]
done, pending = await asyncio.wait(
tasks,
return_when=asyncio.ALL_COMPLETED
)
for task in done:
try:
result = await task
print(f"成功: {result}")
except Exception as e:
print(f"失败: {e}")
asyncio.run(main())
使用 asyncio.Task 对象
import asyncio
async def risky_task():
raise ValueError("任务错误")
async def main():
# 创建任务
task = asyncio.create_task(risky_task())
# 添加完成回调
task.add_done_callback(lambda t: print(f"任务完成: {t}"))
# 取消任务
# task.cancel()
try:
result = await task
except ValueError as e:
print(f"任务错误: {e}")
# 检查任务异常
if task.exception():
print(f"任务异常: {task.exception()}")
asyncio.run(main())
超时处理
import asyncio
async def slow_operation():
await asyncio.sleep(10)
return "完成"
async def main():
try:
# 设置超时时间
result = await asyncio.wait_for(slow_operation(), timeout=5)
print(result)
except asyncio.TimeoutError:
print("操作超时")
asyncio.run(main())
使用回调函数处理异常
import asyncio
import functools
def error_handler(loop, context):
"""全局错误处理器"""
print(f"捕获到异常: {context}")
print(f"异常消息: {context.get('exception')}")
async def problem_task():
await asyncio.sleep(1)
raise ValueError("发生错误")
async def main():
loop = asyncio.get_running_loop()
# 设置全局异常处理器
loop.set_exception_handler(error_handler)
task = asyncio.create_task(problem_task())
try:
await task
except Exception as e:
print(f"直接捕获: {e}")
asyncio.run(main())
完整的实际案例
import asyncio
import aiohttp
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AsyncErrorHandler:
async def fetch_url(self, url: str, timeout: int = 5):
"""异步获取URL内容"""
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=timeout) as response:
response.raise_for_status()
return await response.text()
except asyncio.TimeoutError:
logger.error(f"请求 {url} 超时")
return None
except aiohttp.ClientError as e:
logger.error(f"请求 {url} 客户端错误: {e}")
return None
except Exception as e:
logger.exception(f"请求 {url} 未知错误: {e}")
return None
async def batch_fetch(self, urls: list):
"""批量获取多个URL"""
tasks = [self.fetch_url(url) for url in urls]
# 使用return_exceptions收集所有结果
results = await asyncio.gather(*tasks, return_exceptions=True)
successful = []
failed = []
for url, result in zip(urls, results):
if isinstance(result, Exception):
logger.error(f"获取 {url} 失败: {result}")
failed.append(url)
elif result is None:
failed.append(url)
else:
successful.append((url, result[:100])) # 只保存前100字符
return successful, failed
async def main():
handler = AsyncErrorHandler()
urls = [
"https://httpbin.org/delay/1", # 延迟1秒
"https://httpbin.org/status/404", # 返回404
"https://invalid-url.com", # 无效URL
"https://httpbin.org/get" # 成功
]
successful, failed = await handler.batch_fetch(urls)
print(f"成功: {len(successful)} 个URL")
print(f"失败: {len(failed)} 个URL")
for url, content in successful:
print(f" {url}: {content}")
if __name__ == "__main__":
asyncio.run(main())
关键要点:
- 使用 try-except: 最基础的错误处理方式
- return_exceptions=True: 在 gather 中收集异常而非抛出
- 任务回调: 使用 add_done_callback 监控任务状态
- 超时控制: 使用 asyncio.wait_for 防止永久等待
- 日志记录: 使用 logging 模块记录错误信息
- 异常类型: 区分不同类型的异常并合理处理
这些方法可以根据具体情况组合使用,构建健壮的异步程序。