本文目录导读:

在Python中实现异步回调,主要有以下几种常见方式,我将为你提供完整的代码案例:
使用回调函数(传统方式)
import asyncio
from typing import Callable
# 定义一个异步任务
async def async_task(data: str, callback: Callable):
print(f"开始处理: {data}")
await asyncio.sleep(2) # 模拟耗时操作
result = f"处理完成: {data.upper()}"
# 调用回调函数
if callback:
callback(result)
return result
# 回调函数
def my_callback(result: str):
print(f"回调函数收到: {result}")
# 使用示例
async def main():
print("启动异步任务")
await async_task("hello", my_callback)
print("异步任务完成")
if __name__ == "__main__":
asyncio.run(main())
使用Future实现回调
import asyncio
async def compute_something(future: asyncio.Future):
await asyncio.sleep(1)
# 设置Future的结果,触发回调
future.set_result("计算完成的结果")
def future_callback(future):
print(f"Future回调: {future.result()}")
async def main():
loop = asyncio.get_event_loop()
future = loop.create_future()
# 添加回调
future.add_done_callback(future_callback)
# 启动异步任务
await compute_something(future)
if __name__ == "__main__":
asyncio.run(main())
使用装饰器实现回调注册
import asyncio
from functools import wraps
from typing import Callable, List
class AsyncCallbackSystem:
def __init__(self):
self.callbacks: List[Callable] = []
def register_callback(self, callback: Callable):
"""注册回调函数"""
self.callbacks.append(callback)
return callback
def unregister_callback(self, callback: Callable):
"""取消注册"""
if callback in self.callbacks:
self.callbacks.remove(callback)
async def execute_callback(self, data):
"""执行所有注册的回调"""
for callback in self.callbacks:
if asyncio.iscoroutinefunction(callback):
await callback(data)
else:
callback(data)
# 创建回调系统
callback_system = AsyncCallbackSystem()
# 注册同步回调
@callback_system.register_callback
def sync_callback(data):
print(f"同步回调: {data}")
# 注册异步回调
@callback_system.register_callback
async def async_callback(data):
await asyncio.sleep(0.5)
print(f"异步回调: {data.upper()}")
async def main_task():
print("执行主任务...")
await asyncio.sleep(1)
# 触发回调
await callback_system.execute_callback("任务完成")
# 取消一个回调
callback_system.unregister_callback(sync_callback)
print("已取消同步回调")
async def main():
await main_task()
if __name__ == "__main__":
asyncio.run(main())
使用类实现回调(面向对象方式)
import asyncio
from typing import Any, Callable, List, Optional
class AsyncEventEmitter:
"""异步事件发射器"""
def __init__(self):
self._events = {}
def on(self, event_name: str, callback: Callable):
"""注册事件监听"""
if event_name not in self._events:
self._events[event_name] = []
self._events[event_name].append(callback)
return self
def remove_listener(self, event_name: str, callback: Callable):
"""移除事件监听"""
if event_name in self._events:
self._events[event_name] = [
cb for cb in self._events[event_name] if cb != callback
]
async def emit(self, event_name: str, *args, **kwargs):
"""触发事件"""
if event_name in self._events:
for callback in self._events[event_name]:
if asyncio.iscoroutinefunction(callback):
await callback(*args, **kwargs)
else:
callback(*args, **kwargs)
# 使用示例
emitter = AsyncEventEmitter()
# 定义事件处理函数
async def on_data_received(data):
print(f"收到数据: {data}")
await asyncio.sleep(0.5)
print("数据处理完成")
def on_error(error):
print(f"错误信息: {error}")
# 注册事件
emitter.on("data", on_data_received)
emitter.on("error", on_error)
async def data_producer():
"""数据生产者"""
for i in range(3):
await asyncio.sleep(1)
if i == 2:
# 触发错误事件
await emitter.emit("error", f"第{i+1}次数据错误")
else:
# 触发数据事件
await emitter.emit("data", f"数据_{i+1}")
async def main():
print("开始数据生产...")
await data_producer()
print("所有数据生产完成")
if __name__ == "__main__":
asyncio.run(main())
使用asyncio.Queue实现回调队列
import asyncio
from typing import Callable
class CallbackQueue:
"""回调队列管理"""
def __init__(self):
self.queue = asyncio.Queue()
self.running = False
async def add_callback(self, callback: Callable, *args, **kwargs):
"""添加回调到队列"""
await self.queue.put((callback, args, kwargs))
async def process_queue(self):
"""处理队列中的回调"""
self.running = True
while self.running:
try:
callback, args, kwargs = await asyncio.wait_for(
self.queue.get(), timeout=1.0
)
# 执行回调
if asyncio.iscoroutinefunction(callback):
await callback(*args, **kwargs)
else:
callback(*args, **kwargs)
self.queue.task_done()
except asyncio.TimeoutError:
continue
except Exception as e:
print(f"回调执行错误: {e}")
def stop(self):
"""停止处理队列"""
self.running = False
# 定义回调函数
async def async_callback_handler(data):
await asyncio.sleep(0.5)
print(f"异步处理: {data}")
def sync_callback_handler(data):
print(f"同步处理: {data}")
async def main():
callback_queue = CallbackQueue()
# 启动队列处理器
processor_task = asyncio.create_task(callback_queue.process_queue())
# 添加回调
for i in range(5):
await callback_queue.add_callback(
async_callback_handler,
f"异步数据_{i+1}"
)
await callback_queue.add_callback(
sync_callback_handler,
f"同步数据_{i+1}"
)
# 等待队列处理完成
await asyncio.sleep(3)
# 停止处理器
callback_queue.stop()
await processor_task
if __name__ == "__main__":
asyncio.run(main())
最佳实践建议
-
选择合适的回调方式:
- 简单场景:使用基础回调函数
- 复杂场景:使用事件发射器或回调队列
-
错误处理:
async def safe_callback_executor(callback, *args, **kwargs): try: if asyncio.iscoroutinefunction(callback): return await callback(*args, **kwargs) else: return callback(*args, **kwargs) except Exception as e: print(f"回调执行失败: {e}") # 可以记录日志或进行其他错误处理 -
避免回调地狱:
- 使用
asyncio.gather()并行处理多个回调 - 考虑使用
async/await模式替代深层嵌套的回调
- 使用
这些案例展示了Python中实现异步回调的不同方式,你可以根据实际需求选择最适合的方案。