Python案例如何实现异步回调?

wen python案例 57

本文目录导读:

Python案例如何实现异步回调?

  1. 使用回调函数(传统方式)
  2. 使用Future实现回调
  3. 使用装饰器实现回调注册
  4. 使用类实现回调(面向对象方式)
  5. 使用asyncio.Queue实现回调队列
  6. 最佳实践建议

在Python中实现异步回调,主要有以下几种常见方式,我将为你提供完整的代码案例:

使用回调函数(传统方式)

import asyncio
from typing import Callable
# 定义一个异步任务
async def async_task(data: str, callback: Callable):
    print(f"开始处理: {data}")
    await asyncio.sleep(2)  # 模拟耗时操作
    result = f"处理完成: {data.upper()}"
    # 调用回调函数
    if callback:
        callback(result)
    return result
# 回调函数
def my_callback(result: str):
    print(f"回调函数收到: {result}")
# 使用示例
async def main():
    print("启动异步任务")
    await async_task("hello", my_callback)
    print("异步任务完成")
if __name__ == "__main__":
    asyncio.run(main())

使用Future实现回调

import asyncio
async def compute_something(future: asyncio.Future):
    await asyncio.sleep(1)
    # 设置Future的结果,触发回调
    future.set_result("计算完成的结果")
def future_callback(future):
    print(f"Future回调: {future.result()}")
async def main():
    loop = asyncio.get_event_loop()
    future = loop.create_future()
    # 添加回调
    future.add_done_callback(future_callback)
    # 启动异步任务
    await compute_something(future)
if __name__ == "__main__":
    asyncio.run(main())

使用装饰器实现回调注册

import asyncio
from functools import wraps
from typing import Callable, List
class AsyncCallbackSystem:
    def __init__(self):
        self.callbacks: List[Callable] = []
    def register_callback(self, callback: Callable):
        """注册回调函数"""
        self.callbacks.append(callback)
        return callback
    def unregister_callback(self, callback: Callable):
        """取消注册"""
        if callback in self.callbacks:
            self.callbacks.remove(callback)
    async def execute_callback(self, data):
        """执行所有注册的回调"""
        for callback in self.callbacks:
            if asyncio.iscoroutinefunction(callback):
                await callback(data)
            else:
                callback(data)
# 创建回调系统
callback_system = AsyncCallbackSystem()
# 注册同步回调
@callback_system.register_callback
def sync_callback(data):
    print(f"同步回调: {data}")
# 注册异步回调
@callback_system.register_callback
async def async_callback(data):
    await asyncio.sleep(0.5)
    print(f"异步回调: {data.upper()}")
async def main_task():
    print("执行主任务...")
    await asyncio.sleep(1)
    # 触发回调
    await callback_system.execute_callback("任务完成")
    # 取消一个回调
    callback_system.unregister_callback(sync_callback)
    print("已取消同步回调")
async def main():
    await main_task()
if __name__ == "__main__":
    asyncio.run(main())

使用类实现回调(面向对象方式)

import asyncio
from typing import Any, Callable, List, Optional
class AsyncEventEmitter:
    """异步事件发射器"""
    def __init__(self):
        self._events = {}
    def on(self, event_name: str, callback: Callable):
        """注册事件监听"""
        if event_name not in self._events:
            self._events[event_name] = []
        self._events[event_name].append(callback)
        return self
    def remove_listener(self, event_name: str, callback: Callable):
        """移除事件监听"""
        if event_name in self._events:
            self._events[event_name] = [
                cb for cb in self._events[event_name] if cb != callback
            ]
    async def emit(self, event_name: str, *args, **kwargs):
        """触发事件"""
        if event_name in self._events:
            for callback in self._events[event_name]:
                if asyncio.iscoroutinefunction(callback):
                    await callback(*args, **kwargs)
                else:
                    callback(*args, **kwargs)
# 使用示例
emitter = AsyncEventEmitter()
# 定义事件处理函数
async def on_data_received(data):
    print(f"收到数据: {data}")
    await asyncio.sleep(0.5)
    print("数据处理完成")
def on_error(error):
    print(f"错误信息: {error}")
# 注册事件
emitter.on("data", on_data_received)
emitter.on("error", on_error)
async def data_producer():
    """数据生产者"""
    for i in range(3):
        await asyncio.sleep(1)
        if i == 2:
            # 触发错误事件
            await emitter.emit("error", f"第{i+1}次数据错误")
        else:
            # 触发数据事件
            await emitter.emit("data", f"数据_{i+1}")
async def main():
    print("开始数据生产...")
    await data_producer()
    print("所有数据生产完成")
if __name__ == "__main__":
    asyncio.run(main())

使用asyncio.Queue实现回调队列

import asyncio
from typing import Callable
class CallbackQueue:
    """回调队列管理"""
    def __init__(self):
        self.queue = asyncio.Queue()
        self.running = False
    async def add_callback(self, callback: Callable, *args, **kwargs):
        """添加回调到队列"""
        await self.queue.put((callback, args, kwargs))
    async def process_queue(self):
        """处理队列中的回调"""
        self.running = True
        while self.running:
            try:
                callback, args, kwargs = await asyncio.wait_for(
                    self.queue.get(), timeout=1.0
                )
                # 执行回调
                if asyncio.iscoroutinefunction(callback):
                    await callback(*args, **kwargs)
                else:
                    callback(*args, **kwargs)
                self.queue.task_done()
            except asyncio.TimeoutError:
                continue
            except Exception as e:
                print(f"回调执行错误: {e}")
    def stop(self):
        """停止处理队列"""
        self.running = False
# 定义回调函数
async def async_callback_handler(data):
    await asyncio.sleep(0.5)
    print(f"异步处理: {data}")
def sync_callback_handler(data):
    print(f"同步处理: {data}")
async def main():
    callback_queue = CallbackQueue()
    # 启动队列处理器
    processor_task = asyncio.create_task(callback_queue.process_queue())
    # 添加回调
    for i in range(5):
        await callback_queue.add_callback(
            async_callback_handler, 
            f"异步数据_{i+1}"
        )
        await callback_queue.add_callback(
            sync_callback_handler,
            f"同步数据_{i+1}"
        )
    # 等待队列处理完成
    await asyncio.sleep(3)
    # 停止处理器
    callback_queue.stop()
    await processor_task
if __name__ == "__main__":
    asyncio.run(main())

最佳实践建议

  1. 选择合适的回调方式

    • 简单场景:使用基础回调函数
    • 复杂场景:使用事件发射器或回调队列
  2. 错误处理

    async def safe_callback_executor(callback, *args, **kwargs):
        try:
            if asyncio.iscoroutinefunction(callback):
                return await callback(*args, **kwargs)
            else:
                return callback(*args, **kwargs)
        except Exception as e:
            print(f"回调执行失败: {e}")
            # 可以记录日志或进行其他错误处理
  3. 避免回调地狱

    • 使用 asyncio.gather() 并行处理多个回调
    • 考虑使用 async/await 模式替代深层嵌套的回调

这些案例展示了Python中实现异步回调的不同方式,你可以根据实际需求选择最适合的方案。

抱歉,评论功能暂时关闭!