Python案例如何排队执行任务?

wen python案例 58

本文目录导读:

Python案例如何排队执行任务?

  1. 使用 queue.Queue 和线程池
  2. 使用 concurrent.futures.ThreadPoolExecutor
  3. 使用 asyncio 实现异步排队
  4. 使用第三方库 celery(分布式任务队列)
  5. 选择建议

在Python中实现任务排队执行有多种方法,这里介绍几种最常见的方案。

使用 queue.Queue 和线程池

这是最经典的方法,适合I/O密集型任务:

import queue
import threading
import time
import random
def worker(task_queue):
    """工作线程函数"""
    while True:
        # 从队列获取任务
        task = task_queue.get()
        if task is None:
            break  # 收到结束信号
        # 执行任务
        task_id, task_data = task
        print(f"[线程 {threading.current_thread().name}] 处理任务 {task_id}")
        time.sleep(random.uniform(0.5, 2))  # 模拟任务处理
        print(f"[线程 {threading.current_thread().name}] 完成任务 {task_id}")
        # 标记任务完成
        task_queue.task_done()
def main():
    # 创建任务队列
    task_queue = queue.Queue()
    # 启动工作线程
    num_workers = 3
    workers = []
    for i in range(num_workers):
        t = threading.Thread(target=worker, args=(task_queue,), name=f"Worker-{i+1}")
        t.daemon = True  # 设置为守护线程
        t.start()
        workers.append(t)
    # 添加任务
    for i in range(10):
        task_queue.put((i, f"Data-{i}"))
    # 等待所有任务完成
    task_queue.join()
    # 发送结束信号
    for _ in range(num_workers):
        task_queue.put(None)
    # 等待工作线程结束
    for t in workers:
        t.join()
    print("所有任务完成!")
if __name__ == "__main__":
    main()

使用 concurrent.futures.ThreadPoolExecutor

更高级的封装,代码更简洁:

from concurrent.futures import ThreadPoolExecutor
import time
import random
def process_task(task_id, task_data):
    """模拟任务处理函数"""
    print(f"处理任务 {task_id}: {task_data}")
    time.sleep(random.uniform(0.5, 2))
    return f"任务 {task_id} 完成,结果: {task_data.upper()}"
def main():
    # 创建线程池
    with ThreadPoolExecutor(max_workers=3) as executor:
        # 方式1: 提交单个任务
        futures = []
        for i in range(10):
            future = executor.submit(process_task, i, f"data_{i}")
            futures.append(future)
        # 获取所有任务结果(按提交顺序)
        results = []
        for future in futures:
            result = future.result()  # 会阻塞直到该任务完成
            results.append(result)
            print(f"获取到结果: {result}")
    print("\n所有任务顺序执行完毕!")
if __name__ == "__main__":
    main()

使用 asyncio 实现异步排队

适合I/O密集型的高并发场景:

import asyncio
import random
async def process_task(task_id, task_data):
    """异步任务处理"""
    print(f"开始处理任务 {task_id}: {task_data}")
    await asyncio.sleep(random.uniform(0.5, 2))  # 模拟异步I/O操作
    result = f"任务 {task_id} 完成"
    print(result)
    return result
async def task_queue_manager(tasks):
    """任务队列管理器"""
    semaphore = asyncio.Semaphore(3)  # 控制并发数
    async def worker(task):
        async with semaphore:
            return await process_task(*task)
    # 创建所有任务
    tasks = [worker(task) for task in tasks]
    # 等待所有任务完成
    results = await asyncio.gather(*tasks)
    return results
async def main():
    # 准备任务列表
    tasks = [(i, f"data_{i}") for i in range(10)]
    print("开始执行异步任务队列...")
    results = await task_queue_manager(tasks)
    print("\n所有任务完成:")
    for result in results:
        print(f"  - {result}")
if __name__ == "__main__":
    asyncio.run(main())

使用第三方库 celery(分布式任务队列)

适合生产环境的大型项目:

# 需要先安装: pip install celery
from celery import Celery
import time
# 创建Celery应用
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def process_task(task_id, task_data):
    """Celery任务"""
    print(f"处理任务 {task_id}: {task_data}")
    time.sleep(2)  # 模拟耗时操作
    return f"任务 {task_id} 完成"
def submit_tasks():
    """提交任务到队列"""
    tasks = []
    for i in range(10):
        # 异步提交任务
        result = process_task.delay(i, f"data_{i}")
        tasks.append(result)
    # 等待所有任务完成
    for task in tasks:
        print(f"任务结果: {task.get()}")  # get()会阻塞直到任务完成
if __name__ == "__main__":
    submit_tasks()

选择建议

  • 简单场景:使用 queue.Queue + 线程
  • 标准多线程:使用 ThreadPoolExecutor
  • 高并发I/O:使用 asyncio
  • 分布式大规模任务:使用 CeleryRabbitMQ

每种方法都有其适用场景,根据你的具体需求选择最合适的方案。

抱歉,评论功能暂时关闭!