本文目录导读:

- 使用
queue.Queue和线程池 - 使用
concurrent.futures.ThreadPoolExecutor - 使用
asyncio实现异步排队 - 使用第三方库
celery(分布式任务队列) - 选择建议
在Python中实现任务排队执行有多种方法,这里介绍几种最常见的方案。
使用 queue.Queue 和线程池
这是最经典的方法,适合I/O密集型任务:
import queue
import threading
import time
import random
def worker(task_queue):
"""工作线程函数"""
while True:
# 从队列获取任务
task = task_queue.get()
if task is None:
break # 收到结束信号
# 执行任务
task_id, task_data = task
print(f"[线程 {threading.current_thread().name}] 处理任务 {task_id}")
time.sleep(random.uniform(0.5, 2)) # 模拟任务处理
print(f"[线程 {threading.current_thread().name}] 完成任务 {task_id}")
# 标记任务完成
task_queue.task_done()
def main():
# 创建任务队列
task_queue = queue.Queue()
# 启动工作线程
num_workers = 3
workers = []
for i in range(num_workers):
t = threading.Thread(target=worker, args=(task_queue,), name=f"Worker-{i+1}")
t.daemon = True # 设置为守护线程
t.start()
workers.append(t)
# 添加任务
for i in range(10):
task_queue.put((i, f"Data-{i}"))
# 等待所有任务完成
task_queue.join()
# 发送结束信号
for _ in range(num_workers):
task_queue.put(None)
# 等待工作线程结束
for t in workers:
t.join()
print("所有任务完成!")
if __name__ == "__main__":
main()
使用 concurrent.futures.ThreadPoolExecutor
更高级的封装,代码更简洁:
from concurrent.futures import ThreadPoolExecutor
import time
import random
def process_task(task_id, task_data):
"""模拟任务处理函数"""
print(f"处理任务 {task_id}: {task_data}")
time.sleep(random.uniform(0.5, 2))
return f"任务 {task_id} 完成,结果: {task_data.upper()}"
def main():
# 创建线程池
with ThreadPoolExecutor(max_workers=3) as executor:
# 方式1: 提交单个任务
futures = []
for i in range(10):
future = executor.submit(process_task, i, f"data_{i}")
futures.append(future)
# 获取所有任务结果(按提交顺序)
results = []
for future in futures:
result = future.result() # 会阻塞直到该任务完成
results.append(result)
print(f"获取到结果: {result}")
print("\n所有任务顺序执行完毕!")
if __name__ == "__main__":
main()
使用 asyncio 实现异步排队
适合I/O密集型的高并发场景:
import asyncio
import random
async def process_task(task_id, task_data):
"""异步任务处理"""
print(f"开始处理任务 {task_id}: {task_data}")
await asyncio.sleep(random.uniform(0.5, 2)) # 模拟异步I/O操作
result = f"任务 {task_id} 完成"
print(result)
return result
async def task_queue_manager(tasks):
"""任务队列管理器"""
semaphore = asyncio.Semaphore(3) # 控制并发数
async def worker(task):
async with semaphore:
return await process_task(*task)
# 创建所有任务
tasks = [worker(task) for task in tasks]
# 等待所有任务完成
results = await asyncio.gather(*tasks)
return results
async def main():
# 准备任务列表
tasks = [(i, f"data_{i}") for i in range(10)]
print("开始执行异步任务队列...")
results = await task_queue_manager(tasks)
print("\n所有任务完成:")
for result in results:
print(f" - {result}")
if __name__ == "__main__":
asyncio.run(main())
使用第三方库 celery(分布式任务队列)
适合生产环境的大型项目:
# 需要先安装: pip install celery
from celery import Celery
import time
# 创建Celery应用
app = Celery('tasks', broker='redis://localhost:6379/0')
@app.task
def process_task(task_id, task_data):
"""Celery任务"""
print(f"处理任务 {task_id}: {task_data}")
time.sleep(2) # 模拟耗时操作
return f"任务 {task_id} 完成"
def submit_tasks():
"""提交任务到队列"""
tasks = []
for i in range(10):
# 异步提交任务
result = process_task.delay(i, f"data_{i}")
tasks.append(result)
# 等待所有任务完成
for task in tasks:
print(f"任务结果: {task.get()}") # get()会阻塞直到任务完成
if __name__ == "__main__":
submit_tasks()
选择建议
- 简单场景:使用
queue.Queue+ 线程 - 标准多线程:使用
ThreadPoolExecutor - 高并发I/O:使用
asyncio - 分布式大规模任务:使用
Celery或RabbitMQ
每种方法都有其适用场景,根据你的具体需求选择最合适的方案。