Python案例怎么使用async异步?

wen python案例 10

本文目录导读:

Python案例怎么使用async异步?

  1. 基础概念
  2. 基础案例
  3. 高级技巧
  4. 常见错误处理
  5. 性能对比
  6. 安装依赖

我来为你详细讲解Python中async/await异步编程的使用方法,配合实际案例。

基础概念

什么是异步编程?

异步编程允许程序在等待I/O操作(如网络请求、文件读写)时,切换执行其他任务,提高程序效率。

基础案例

案例1:最基础的异步函数

import asyncio
# 定义异步函数
async def hello():
    print("Hello")
    await asyncio.sleep(1)  # 模拟耗时操作
    print("World")
# 运行异步函数
asyncio.run(hello())

案例2:多个异步任务并发执行

import asyncio
import time
async def fetch_data(delay, name):
    """模拟网络请求"""
    print(f"开始获取{name}数据...")
    await asyncio.sleep(delay)  # 模拟网络延迟
    print(f"{name}数据获取完成!")
    return f"{name}的结果"
async def main():
    # 方法1:使用asyncio.gather并发执行
    start = time.time()
    # 同时发起三个请求
    results = await asyncio.gather(
        fetch_data(2, "百度"),
        fetch_data(3, "谷歌"),
        fetch_data(1, "必应")
    )
    end = time.time()
    print(f"结果: {results}")
    print(f"总耗时: {end - start:.2f}秒")
# 运行
asyncio.run(main())

案例3:实际应用 - 批量下载图片

import asyncio
import aiohttp
import aiofiles
import time
async def download_image(session, url, file_name):
    """异步下载单张图片"""
    try:
        async with session.get(url) as response:
            if response.status == 200:
                # 异步写入文件
                content = await response.read()
                async with aiofiles.open(file_name, 'wb') as f:
                    await f.write(content)
                print(f"下载完成: {file_name}")
                return True
            else:
                print(f"下载失败: {url}")
                return False
    except Exception as e:
        print(f"下载出错: {url}, 错误: {e}")
        return False
async def download_all_images():
    """批量下载图片"""
    urls = [
        ("https://example.com/image1.jpg", "image1.jpg"),
        ("https://example.com/image2.jpg", "image2.jpg"),
        ("https://example.com/image3.jpg", "image3.jpg"),
    ]
    async with aiohttp.ClientSession() as session:
        # 创建所有下载任务
        tasks = [download_image(session, url, name) for url, name in urls]
        # 并发执行所有任务
        results = await asyncio.gather(*tasks)
        success_count = sum(results)
        print(f"总共下载: {success_count}/{len(urls)} 张图片")
# 运行
start = time.time()
asyncio.run(download_all_images())
print(f"总耗时: {time.time() - start:.2f}秒")

案例4:生产者-消费者模式

import asyncio
import random
async def producer(queue):
    """生产者:生成数据"""
    for i in range(5):
        # 模拟生产数据耗时
        await asyncio.sleep(random.uniform(0.5, 1.5))
        item = f"数据-{i}"
        await queue.put(item)
        print(f"生产者: 生产了 {item}")
    # 发送结束信号
    await queue.put(None)
async def consumer(queue, name):
    """消费者:处理数据"""
    while True:
        item = await queue.get()
        # 检查结束信号
        if item is None:
            queue.task_done()
            break
        # 模拟处理数据耗时
        await asyncio.sleep(random.uniform(0.2, 0.8))
        print(f"消费者 {name}: 处理了 {item}")
        queue.task_done()
async def main():
    # 创建队列
    queue = asyncio.Queue(maxsize=3)
    # 创建生产者和消费者任务
    producer_task = asyncio.create_task(producer(queue))
    consumer_tasks = [
        asyncio.create_task(consumer(queue, f"Worker-{i}"))
        for i in range(2)
    ]
    # 等待所有任务完成
    await producer_task
    # 等待消费者处理完所有数据
    await queue.join()
    # 取消消费者任务
    for task in consumer_tasks:
        task.cancel()
# 运行
asyncio.run(main())

案例5:超时控制和错误处理

import asyncio
async def slow_operation():
    """模拟耗时操作"""
    await asyncio.sleep(5)
    return "操作结果"
async def fast_operation():
    """模拟快速操作"""
    await asyncio.sleep(1)
    return "快速结果"
async def main():
    # 方法1:使用asyncio.wait_for设置超时
    try:
        result = await asyncio.wait_for(
            slow_operation(),
            timeout=3  # 3秒超时
        )
        print(f"成功: {result}")
    except asyncio.TimeoutError:
        print("操作超时!")
    # 方法2:使用asyncio.wait控制并发
    tasks = [
        slow_operation(),
        fast_operation()
    ]
    done, pending = await asyncio.wait(
        tasks,
        timeout=2,  # 最多等待2秒
        return_when=asyncio.FIRST_COMPLETED  # 第一个完成就返回
    )
    # 取消未完成的任务
    for task in pending:
        task.cancel()
    print(f"已完成的任务数: {len(done)}")
    for task in done:
        print(f"结果: {task.result()}")
# 运行
asyncio.run(main())

案例6:Web API并发请求(实际应用)

import asyncio
import aiohttp
import json
class AsyncAPI:
    """异步API请求类"""
    def __init__(self, base_url):
        self.base_url = base_url
        self.session = None
    async def __aenter__(self):
        self.session = aiohttp.ClientSession()
        return self
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        await self.session.close()
    async def get_user(self, user_id):
        """获取用户信息"""
        url = f"{self.base_url}/users/{user_id}"
        async with self.session.get(url) as response:
            if response.status == 200:
                data = await response.json()
                return {"user_id": user_id, "data": data}
            else:
                return {"user_id": user_id, "error": f"HTTP {response.status}"}
    async def get_users_batch(self, user_ids):
        """批量获取用户信息"""
        tasks = [self.get_user(uid) for uid in user_ids]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results
async def main():
    # 使用示例(这里使用JSONPlaceholder作为示例API)
    async with AsyncAPI("https://jsonplaceholder.typicode.com") as api:
        user_ids = [1, 2, 3, 4, 5]
        print("开始批量获取用户信息...")
        results = await api.get_users_batch(user_ids)
        for result in results:
            if isinstance(result, Exception):
                print(f"请求失败: {result}")
            else:
                print(f"用户 {result['user_id']}: {result.get('data', {}).get('name', 'N/A')}")
# 安装依赖: pip install aiohttp aiofiles
asyncio.run(main())

高级技巧

异步上下文管理器

import asyncio
class AsyncResource:
    """异步资源管理器"""
    async def __aenter__(self):
        print("打开资源...")
        await asyncio.sleep(1)
        return self
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        print("关闭资源...")
        await asyncio.sleep(0.5)
    async def do_something(self):
        print("执行操作...")
        await asyncio.sleep(0.5)
        return "完成"
async def main():
    async with AsyncResource() as resource:
        result = await resource.do_something()
        print(f"结果: {result}")
asyncio.run(main())

异步迭代器

import asyncio
class AsyncCounter:
    """异步计数器"""
    def __init__(self, limit):
        self.limit = limit
        self.current = 0
    def __aiter__(self):
        return self
    async def __anext__(self):
        if self.current >= self.limit:
            raise StopAsyncIteration
        await asyncio.sleep(0.5)  # 模拟异步操作
        self.current += 1
        return self.current
async def main():
    async for number in AsyncCounter(5):
        print(f"计数: {number}")
asyncio.run(main())

常见错误处理

import asyncio
# 错误示例1:忘记await
async def bad_example():
    result = asyncio.sleep(1)  # 错误!没有await
    return result  # 返回coroutine对象,不是结果
# 正确示例
async def good_example():
    await asyncio.sleep(1)
    return "正确结果"
# 错误示例2:在同步函数中使用await
def sync_function():
    # await asyncio.sleep(1)  # 错误!不能在同步函数中使用await
    pass
# 正确示例:使用asyncio.run包装
async def async_function():
    await asyncio.sleep(1)
    print("异步执行")
# 错误示例3:阻塞主线程
async def blocking_example():
    import time
    time.sleep(5)  # 错误!会阻塞整个事件循环
    return "完成"
# 正确示例:使用asyncio.sleep
async def non_blocking_example():
    await asyncio.sleep(5)
    return "完成"
async def main():
    try:
        result = await good_example()
        print(f"成功: {result}")
    except Exception as e:
        print(f"错误: {e}")
# 运行
asyncio.run(main())

性能对比

import asyncio
import time
import random
# 同步版本
def sync_task(name, delay):
    print(f"同步任务 {name} 开始")
    time.sleep(delay)
    print(f"同步任务 {name} 结束")
    return f"任务 {name} 完成"
def sync_main():
    start = time.time()
    results = []
    for i in range(3):
        result = sync_task(i, random.uniform(1, 3))
        results.append(result)
    print(f"同步耗时: {time.time() - start:.2f}秒")
    return results
# 异步版本
async def async_task(name, delay):
    print(f"异步任务 {name} 开始")
    await asyncio.sleep(delay)
    print(f"异步任务 {name} 结束")
    return f"任务 {name} 完成"
async def async_main():
    start = time.time()
    tasks = [
        async_task(i, random.uniform(1, 3))
        for i in range(3)
    ]
    results = await asyncio.gather(*tasks)
    print(f"异步耗时: {time.time() - start:.2f}秒")
    return results
# 运行对比
print("同步版本:")
sync_results = sync_main()
print("\n异步版本:")
async_results = asyncio.run(async_main())

安装依赖

使用异步HTTP请求需要安装额外库:

pip install aiohttp aiofiles
  1. 适用场景:I/O密集型任务(网络请求、文件读写、数据库操作)
  2. 不适用场景:CPU密集型任务(计算、加密等)
  3. 关键语法async defawaitasyncio.run()
  4. 常用函数asyncio.gather()asyncio.create_task()asyncio.wait()

异步编程的核心是await关键字,它告诉Python:"这里可能需要等待,先去执行其他任务"!

抱歉,评论功能暂时关闭!