Python案例如何处理异步报错?

wen python案例 58

本文目录导读:

Python案例如何处理异步报错?

  1. 使用 try-except 捕获协程中的错误
  2. 使用 asyncio.gather() 处理多个协程
  3. 使用 asyncio.wait() 处理任务集
  4. 使用 asyncio.Task 对象
  5. 超时处理
  6. 使用回调函数处理异常
  7. 完整的实际案例
  8. 关键要点:

在Python中处理异步操作的错误,主要有以下几种方式:

使用 try-except 捕获协程中的错误

import asyncio
async def risky_operation():
    raise ValueError("某个值错误")
async def main():
    try:
        await risky_operation()
    except ValueError as e:
        print(f"捕获到错误: {e}")
asyncio.run(main())

使用 asyncio.gather() 处理多个协程

import asyncio
async def task1():
    await asyncio.sleep(1)
    return "任务1完成"
async def task2():
    await asyncio.sleep(0.5)
    raise RuntimeError("任务2错误")
async def main():
    try:
        results = await asyncio.gather(
            task1(),
            task2(),
            return_exceptions=False  # 遇到第一个错误就抛出
        )
    except RuntimeError as e:
        print(f"捕获到错误: {e}")
    # 使用 return_exceptions=True 收集所有结果(包括错误)
    results = await asyncio.gather(
        task1(),
        task2(),
        return_exceptions=True  # 将异常作为结果返回
    )
    for i, result in enumerate(results):
        if isinstance(result, Exception):
            print(f"任务{i}出错: {result}")
        else:
            print(f"任务{i}成功: {result}")
asyncio.run(main())

使用 asyncio.wait() 处理任务集

import asyncio
async def worker(name, success=True):
    await asyncio.sleep(1)
    if not success:
        raise Exception(f"{name} 失败")
    return f"{name} 成功"
async def main():
    tasks = [
        worker("A", True),
        worker("B", False),
        worker("C", True)
    ]
    done, pending = await asyncio.wait(
        tasks,
        return_when=asyncio.ALL_COMPLETED
    )
    for task in done:
        try:
            result = await task
            print(f"成功: {result}")
        except Exception as e:
            print(f"失败: {e}")
asyncio.run(main())

使用 asyncio.Task 对象

import asyncio
async def risky_task():
    raise ValueError("任务错误")
async def main():
    # 创建任务
    task = asyncio.create_task(risky_task())
    # 添加完成回调
    task.add_done_callback(lambda t: print(f"任务完成: {t}"))
    # 取消任务
    # task.cancel()
    try:
        result = await task
    except ValueError as e:
        print(f"任务错误: {e}")
    # 检查任务异常
    if task.exception():
        print(f"任务异常: {task.exception()}")
asyncio.run(main())

超时处理

import asyncio
async def slow_operation():
    await asyncio.sleep(10)
    return "完成"
async def main():
    try:
        # 设置超时时间
        result = await asyncio.wait_for(slow_operation(), timeout=5)
        print(result)
    except asyncio.TimeoutError:
        print("操作超时")
asyncio.run(main())

使用回调函数处理异常

import asyncio
import functools
def error_handler(loop, context):
    """全局错误处理器"""
    print(f"捕获到异常: {context}")
    print(f"异常消息: {context.get('exception')}")
async def problem_task():
    await asyncio.sleep(1)
    raise ValueError("发生错误")
async def main():
    loop = asyncio.get_running_loop()
    # 设置全局异常处理器
    loop.set_exception_handler(error_handler)
    task = asyncio.create_task(problem_task())
    try:
        await task
    except Exception as e:
        print(f"直接捕获: {e}")
asyncio.run(main())

完整的实际案例

import asyncio
import aiohttp
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class AsyncErrorHandler:
    async def fetch_url(self, url: str, timeout: int = 5):
        """异步获取URL内容"""
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, timeout=timeout) as response:
                    response.raise_for_status()
                    return await response.text()
        except asyncio.TimeoutError:
            logger.error(f"请求 {url} 超时")
            return None
        except aiohttp.ClientError as e:
            logger.error(f"请求 {url} 客户端错误: {e}")
            return None
        except Exception as e:
            logger.exception(f"请求 {url} 未知错误: {e}")
            return None
    async def batch_fetch(self, urls: list):
        """批量获取多个URL"""
        tasks = [self.fetch_url(url) for url in urls]
        # 使用return_exceptions收集所有结果
        results = await asyncio.gather(*tasks, return_exceptions=True)
        successful = []
        failed = []
        for url, result in zip(urls, results):
            if isinstance(result, Exception):
                logger.error(f"获取 {url} 失败: {result}")
                failed.append(url)
            elif result is None:
                failed.append(url)
            else:
                successful.append((url, result[:100]))  # 只保存前100字符
        return successful, failed
async def main():
    handler = AsyncErrorHandler()
    urls = [
        "https://httpbin.org/delay/1",  # 延迟1秒
        "https://httpbin.org/status/404",  # 返回404
        "https://invalid-url.com",  # 无效URL
        "https://httpbin.org/get"  # 成功
    ]
    successful, failed = await handler.batch_fetch(urls)
    print(f"成功: {len(successful)} 个URL")
    print(f"失败: {len(failed)} 个URL")
    for url, content in successful:
        print(f"  {url}: {content}")
if __name__ == "__main__":
    asyncio.run(main())

关键要点:

  1. 使用 try-except: 最基础的错误处理方式
  2. return_exceptions=True: 在 gather 中收集异常而非抛出
  3. 任务回调: 使用 add_done_callback 监控任务状态
  4. 超时控制: 使用 asyncio.wait_for 防止永久等待
  5. 日志记录: 使用 logging 模块记录错误信息
  6. 异常类型: 区分不同类型的异常并合理处理

这些方法可以根据具体情况组合使用,构建健壮的异步程序。

抱歉,评论功能暂时关闭!