本文目录导读:

我来详细解释Python中同步和异步结果的同步方法,包含多种实现方案:
基础概念理解
import asyncio
import time
# 同步方式
def sync_task(name, delay):
print(f"开始同步任务 {name}")
time.sleep(delay)
print(f"完成同步任务 {name}")
return f"结果 {name}"
# 异步方式
async def async_task(name, delay):
print(f"开始异步任务 {name}")
await asyncio.sleep(delay)
print(f"完成异步任务 {name}")
return f"结果 {name}"
同步结果到异步环境
方法1: 使用 loop.run_in_executor()
import asyncio
import time
def blocking_io():
print("开始阻塞IO操作")
time.sleep(2)
return "阻塞IO结果"
async def async_main():
loop = asyncio.get_running_loop()
# 将同步函数在线程池中执行
result = await loop.run_in_executor(None, blocking_io)
print(f"获取到异步结果: {result}")
# 与其他异步任务并行
async_result = await async_task("异步任务", 1)
print(f"异步任务结果: {async_result}")
async def async_task(name, delay):
await asyncio.sleep(delay)
return f"完成 {name}"
asyncio.run(async_main())
方法2: 使用 asyncio.to_thread() (Python 3.9+)
import asyncio
import time
def calculate_data(n):
time.sleep(2) # 模拟计算
return sum(range(n))
async def main():
# 在线程中运行同步代码
result = await asyncio.to_thread(calculate_data, 1000000)
print(f"计算结果: {result}")
# 并行执行多个同步任务
results = await asyncio.gather(
asyncio.to_thread(calculate_data, 100),
asyncio.to_thread(calculate_data, 200),
async_task("异步任务", 1)
)
print(f"所有结果: {results}")
async def async_task(name, delay):
await asyncio.sleep(delay)
return f"完成 {name}"
asyncio.run(main())
异步结果到同步环境
方法1: 使用 asyncio.run()
import asyncio
async def async_operation():
await asyncio.sleep(1)
return "异步结果"
# 在同步代码中运行异步函数
def sync_function():
print("开始同步函数")
result = asyncio.run(async_operation())
print(f"获取异步结果: {result}")
return result
sync_function()
方法2: 使用 loop.run_until_complete()
import asyncio
async def fetch_data(url):
await asyncio.sleep(2)
return f"数据来自 {url}"
def process_data():
# 创建事件循环
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
# 运行异步任务
result = loop.run_until_complete(fetch_data("https://example.com"))
print(f"获取数据: {result}")
# 运行多个任务
results = loop.run_until_complete(
asyncio.gather(
fetch_data("url1"),
fetch_data("url2"),
fetch_data("url3")
)
)
print(f"多个结果: {results}")
finally:
loop.close()
process_data()
实际应用场景示例
场景1: Web爬虫混合使用
import asyncio
import requests
import aiohttp
import time
class HybridCrawler:
def __init__(self):
self.session = requests.Session()
def sync_fetch(self, url):
"""同步HTTP请求"""
print(f"同步获取: {url}")
response = self.session.get(url)
return response.status_code
async def async_fetch(self, session, url):
"""异步HTTP请求"""
print(f"异步获取: {url}")
async with session.get(url) as response:
return response.status
async def hybrid_crawl(self, urls):
"""混合模式爬虫"""
# 同步方式获取部分数据
sync_results = []
for url in urls[:2]: # 前2个URL使用同步
result = await asyncio.to_thread(self.sync_fetch, url)
sync_results.append(result)
# 异步方式获取剩余数据
async with aiohttp.ClientSession() as session:
tasks = [self.async_fetch(session, url) for url in urls[2:]]
async_results = await asyncio.gather(*tasks)
return sync_results + async_results
async def main():
urls = [
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
"https://httpbin.org/delay/3",
"https://httpbin.org/delay/4"
]
crawler = HybridCrawler()
results = await crawler.hybrid_crawl(urls)
print(f"爬取结果: {results}")
asyncio.run(main())
场景2: 数据库操作混合
import asyncio
import sqlite3
import aiosqlite
class DatabaseManager:
def __init__(self, db_path):
self.db_path = db_path
self.sync_conn = sqlite3.connect(db_path)
def sync_query(self, sql):
"""同步查询"""
cursor = self.sync_conn.cursor()
cursor.execute(sql)
return cursor.fetchall()
async def async_query(self, sql):
"""异步查询"""
async with aiosqlite.connect(self.db_path) as db:
cursor = await db.execute(sql)
return await cursor.fetchall()
async def hybrid_query(self, sync_sql, async_sql):
"""混合查询"""
# 同步查询在线程中运行
sync_task = asyncio.to_thread(self.sync_query, sync_sql)
# 异步查询直接运行
async_task = self.async_query(async_sql)
# 等待所有结果
sync_result, async_result = await asyncio.gather(
sync_task, async_task
)
return {
'sync': sync_result,
'async': async_result
}
async def main():
db = DatabaseManager(":memory:")
# 初始化数据
await db.async_query(
"CREATE TABLE IF NOT EXISTS test (id INTEGER, name TEXT)"
)
await db.async_query(
"INSERT INTO test VALUES (1, 'sync'), (2, 'async')"
)
# 混合查询
results = await db.hybrid_query(
"SELECT * FROM test WHERE id = 1",
"SELECT * FROM test WHERE id = 2"
)
print(f"混合查询结果: {results}")
asyncio.run(main())
高级同步技巧
使用 asyncio.Queue 同步结果
import asyncio
import random
async def producer(queue):
"""生产者:生成数据"""
for i in range(5):
await asyncio.sleep(random.random())
data = f"数据 {i}"
await queue.put(data)
print(f"生产: {data}")
# 发送结束信号
await queue.put(None)
async def consumer(queue, results):
"""消费者:处理数据"""
while True:
data = await queue.get()
if data is None:
break
result = f"处理: {data}"
results.append(result)
print(result)
async def sync_manager():
"""同步管理器"""
queue = asyncio.Queue()
results = []
# 同时运行生产者和消费者
await asyncio.gather(
producer(queue),
consumer(queue, results)
)
return results
results = asyncio.run(sync_manager())
print(f"最终结果: {results}")
最佳实践建议
import asyncio
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor
class ResultSynchronizer:
def __init__(self, max_workers=4):
self.thread_pool = ThreadPoolExecutor(max_workers=max_workers)
self.process_pool = ProcessPoolExecutor(max_workers=2)
async def sync_to_async(self, func, *args, use_process=False):
"""同步转异步"""
executor = self.process_pool if use_process else self.thread_pool
loop = asyncio.get_running_loop()
return await loop.run_in_executor(executor, func, *args)
async def batch_sync_to_async(self, tasks):
"""批量同步转异步"""
sync_tasks = []
for func, args, kwargs in tasks:
task = self.sync_to_async(
lambda: func(*args, **kwargs)
)
sync_tasks.append(task)
return await asyncio.gather(*sync_tasks)
# 使用示例
async def main():
synchronizer = ResultSynchronizer()
# 转换单个同步函数
result = await synchronizer.sync_to_async(
lambda: time.sleep(2) or "完成",
use_process=False
)
# 批量转换
tasks = [
(lambda x: x * 2, [1], {}),
(lambda x: x * 3, [2], {}),
(lambda x: x * 4, [3], {}),
]
results = await synchronizer.batch_sync_to_async(tasks)
print(f"批量结果: {results}")
asyncio.run(main())
关键要点:
- 同步转异步:使用
run_in_executor或to_thread - 异步转同步:使用
asyncio.run或run_until_complete - 结果收集:使用
gather,Queue,TaskGroup等 - 性能优化:合理选择线程池或进程池
- 错误处理:注意异常传递和处理
根据你的具体需求选择合适的同步方式,注意避免事件循环嵌套和死锁。