Python案例怎么同步异步结果?

wen python案例 55

本文目录导读:

Python案例怎么同步异步结果?

  1. 基础概念理解
  2. 同步结果到异步环境
  3. 异步结果到同步环境
  4. 实际应用场景示例
  5. 高级同步技巧
  6. 最佳实践建议

我来详细解释Python中同步和异步结果的同步方法,包含多种实现方案:

基础概念理解

import asyncio
import time
# 同步方式
def sync_task(name, delay):
    print(f"开始同步任务 {name}")
    time.sleep(delay)
    print(f"完成同步任务 {name}")
    return f"结果 {name}"
# 异步方式
async def async_task(name, delay):
    print(f"开始异步任务 {name}")
    await asyncio.sleep(delay)
    print(f"完成异步任务 {name}")
    return f"结果 {name}"

同步结果到异步环境

方法1: 使用 loop.run_in_executor()

import asyncio
import time
def blocking_io():
    print("开始阻塞IO操作")
    time.sleep(2)
    return "阻塞IO结果"
async def async_main():
    loop = asyncio.get_running_loop()
    # 将同步函数在线程池中执行
    result = await loop.run_in_executor(None, blocking_io)
    print(f"获取到异步结果: {result}")
    # 与其他异步任务并行
    async_result = await async_task("异步任务", 1)
    print(f"异步任务结果: {async_result}")
async def async_task(name, delay):
    await asyncio.sleep(delay)
    return f"完成 {name}"
asyncio.run(async_main())

方法2: 使用 asyncio.to_thread() (Python 3.9+)

import asyncio
import time
def calculate_data(n):
    time.sleep(2)  # 模拟计算
    return sum(range(n))
async def main():
    # 在线程中运行同步代码
    result = await asyncio.to_thread(calculate_data, 1000000)
    print(f"计算结果: {result}")
    # 并行执行多个同步任务
    results = await asyncio.gather(
        asyncio.to_thread(calculate_data, 100),
        asyncio.to_thread(calculate_data, 200),
        async_task("异步任务", 1)
    )
    print(f"所有结果: {results}")
async def async_task(name, delay):
    await asyncio.sleep(delay)
    return f"完成 {name}"
asyncio.run(main())

异步结果到同步环境

方法1: 使用 asyncio.run()

import asyncio
async def async_operation():
    await asyncio.sleep(1)
    return "异步结果"
# 在同步代码中运行异步函数
def sync_function():
    print("开始同步函数")
    result = asyncio.run(async_operation())
    print(f"获取异步结果: {result}")
    return result
sync_function()

方法2: 使用 loop.run_until_complete()

import asyncio
async def fetch_data(url):
    await asyncio.sleep(2)
    return f"数据来自 {url}"
def process_data():
    # 创建事件循环
    loop = asyncio.new_event_loop()
    asyncio.set_event_loop(loop)
    try:
        # 运行异步任务
        result = loop.run_until_complete(fetch_data("https://example.com"))
        print(f"获取数据: {result}")
        # 运行多个任务
        results = loop.run_until_complete(
            asyncio.gather(
                fetch_data("url1"),
                fetch_data("url2"),
                fetch_data("url3")
            )
        )
        print(f"多个结果: {results}")
    finally:
        loop.close()
process_data()

实际应用场景示例

场景1: Web爬虫混合使用

import asyncio
import requests
import aiohttp
import time
class HybridCrawler:
    def __init__(self):
        self.session = requests.Session()
    def sync_fetch(self, url):
        """同步HTTP请求"""
        print(f"同步获取: {url}")
        response = self.session.get(url)
        return response.status_code
    async def async_fetch(self, session, url):
        """异步HTTP请求"""
        print(f"异步获取: {url}")
        async with session.get(url) as response:
            return response.status
    async def hybrid_crawl(self, urls):
        """混合模式爬虫"""
        # 同步方式获取部分数据
        sync_results = []
        for url in urls[:2]:  # 前2个URL使用同步
            result = await asyncio.to_thread(self.sync_fetch, url)
            sync_results.append(result)
        # 异步方式获取剩余数据
        async with aiohttp.ClientSession() as session:
            tasks = [self.async_fetch(session, url) for url in urls[2:]]
            async_results = await asyncio.gather(*tasks)
        return sync_results + async_results
async def main():
    urls = [
        "https://httpbin.org/delay/1",
        "https://httpbin.org/delay/2",
        "https://httpbin.org/delay/3",
        "https://httpbin.org/delay/4"
    ]
    crawler = HybridCrawler()
    results = await crawler.hybrid_crawl(urls)
    print(f"爬取结果: {results}")
asyncio.run(main())

场景2: 数据库操作混合

import asyncio
import sqlite3
import aiosqlite
class DatabaseManager:
    def __init__(self, db_path):
        self.db_path = db_path
        self.sync_conn = sqlite3.connect(db_path)
    def sync_query(self, sql):
        """同步查询"""
        cursor = self.sync_conn.cursor()
        cursor.execute(sql)
        return cursor.fetchall()
    async def async_query(self, sql):
        """异步查询"""
        async with aiosqlite.connect(self.db_path) as db:
            cursor = await db.execute(sql)
            return await cursor.fetchall()
    async def hybrid_query(self, sync_sql, async_sql):
        """混合查询"""
        # 同步查询在线程中运行
        sync_task = asyncio.to_thread(self.sync_query, sync_sql)
        # 异步查询直接运行
        async_task = self.async_query(async_sql)
        # 等待所有结果
        sync_result, async_result = await asyncio.gather(
            sync_task, async_task
        )
        return {
            'sync': sync_result,
            'async': async_result
        }
async def main():
    db = DatabaseManager(":memory:")
    # 初始化数据
    await db.async_query(
        "CREATE TABLE IF NOT EXISTS test (id INTEGER, name TEXT)"
    )
    await db.async_query(
        "INSERT INTO test VALUES (1, 'sync'), (2, 'async')"
    )
    # 混合查询
    results = await db.hybrid_query(
        "SELECT * FROM test WHERE id = 1",
        "SELECT * FROM test WHERE id = 2"
    )
    print(f"混合查询结果: {results}")
asyncio.run(main())

高级同步技巧

使用 asyncio.Queue 同步结果

import asyncio
import random
async def producer(queue):
    """生产者:生成数据"""
    for i in range(5):
        await asyncio.sleep(random.random())
        data = f"数据 {i}"
        await queue.put(data)
        print(f"生产: {data}")
    # 发送结束信号
    await queue.put(None)
async def consumer(queue, results):
    """消费者:处理数据"""
    while True:
        data = await queue.get()
        if data is None:
            break
        result = f"处理: {data}"
        results.append(result)
        print(result)
async def sync_manager():
    """同步管理器"""
    queue = asyncio.Queue()
    results = []
    # 同时运行生产者和消费者
    await asyncio.gather(
        producer(queue),
        consumer(queue, results)
    )
    return results
results = asyncio.run(sync_manager())
print(f"最终结果: {results}")

最佳实践建议

import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
class ResultSynchronizer:
    def __init__(self, max_workers=4):
        self.thread_pool = ThreadPoolExecutor(max_workers=max_workers)
        self.process_pool = ProcessPoolExecutor(max_workers=2)
    async def sync_to_async(self, func, *args, use_process=False):
        """同步转异步"""
        executor = self.process_pool if use_process else self.thread_pool
        loop = asyncio.get_running_loop()
        return await loop.run_in_executor(executor, func, *args)
    async def batch_sync_to_async(self, tasks):
        """批量同步转异步"""
        sync_tasks = []
        for func, args, kwargs in tasks:
            task = self.sync_to_async(
                lambda: func(*args, **kwargs)
            )
            sync_tasks.append(task)
        return await asyncio.gather(*sync_tasks)
# 使用示例
async def main():
    synchronizer = ResultSynchronizer()
    # 转换单个同步函数
    result = await synchronizer.sync_to_async(
        lambda: time.sleep(2) or "完成",
        use_process=False
    )
    # 批量转换
    tasks = [
        (lambda x: x * 2, [1], {}),
        (lambda x: x * 3, [2], {}),
        (lambda x: x * 4, [3], {}),
    ]
    results = await synchronizer.batch_sync_to_async(tasks)
    print(f"批量结果: {results}")
asyncio.run(main())

关键要点:

  1. 同步转异步:使用 run_in_executorto_thread
  2. 异步转同步:使用 asyncio.runrun_until_complete
  3. 结果收集:使用 gather, Queue, TaskGroup
  4. 性能优化:合理选择线程池或进程池
  5. 错误处理:注意异常传递和处理

根据你的具体需求选择合适的同步方式,注意避免事件循环嵌套和死锁。

抱歉,评论功能暂时关闭!