本文目录导读:

我来详细说明如何使用队列处理耗时任务的最佳实践。
队列的基本架构
from queue import Queue
from threading import Thread
import time
import logging
class TaskQueue:
def __init__(self, num_workers=4):
self.task_queue = Queue()
self.result_queue = Queue()
self.workers = []
self.num_workers = num_workers
self.is_running = True
def start_workers(self):
"""启动工作线程"""
for i in range(self.num_workers):
worker = Thread(target=self._worker_loop, name=f"Worker-{i}")
worker.daemon = True
worker.start()
self.workers.append(worker)
def _worker_loop(self):
"""工作线程主循环"""
while self.is_running:
try:
# 获取任务,设置超时避免阻塞
task = self.task_queue.get(timeout=1)
try:
# 执行任务
result = task.execute()
self.result_queue.put({
'task_id': task.id,
'status': 'success',
'result': result
})
except Exception as e:
self.result_queue.put({
'task_id': task.id,
'status': 'failed',
'error': str(e)
})
finally:
self.task_queue.task_done()
except:
continue
def add_task(self, task):
"""添加任务到队列"""
self.task_queue.put(task)
def get_result(self, timeout=None):
"""获取任务结果"""
return self.result_queue.get(timeout=timeout)
任务定义与实现
from abc import ABC, abstractmethod
import uuid
import time
class BaseTask(ABC):
"""任务基类"""
def __init__(self):
self.id = str(uuid.uuid4())
self.created_at = time.time()
self.priority = 0
@abstractmethod
def execute(self):
"""执行任务"""
pass
def __str__(self):
return f"Task({self.id})"
class ImageProcessTask(BaseTask):
"""图片处理任务"""
def __init__(self, image_path, operations):
super().__init__()
self.image_path = image_path
self.operations = operations
def execute(self):
# 模拟耗时任务
time.sleep(2)
return f"Processed {self.image_path} with {self.operations}"
class EmailTask(BaseTask):
"""发送邮件任务"""
def __init__(self, to, subject, body):
super().__init__()
self.to = to
self.subject = subject
self.body = body
def execute(self):
time.sleep(1)
return f"Email sent to {self.to}"
高级队列实现
带优先级的队列
import heapq
from dataclasses import dataclass, field
from typing import Any
@dataclass(order=True)
class PrioritizedItem:
priority: int
task: Any=field(compare=False)
class PriorityTaskQueue:
def __init__(self, num_workers=4):
self.queue = []
self.result_queue = Queue()
self.workers = []
self.num_workers = num_workers
self.lock = threading.Lock()
self.condition = threading.Condition()
def add_task(self, task, priority=0):
with self.condition:
heapq.heappush(self.queue, PrioritizedItem(priority, task))
self.condition.notify()
def get_task(self):
with self.condition:
while len(self.queue) == 0:
self.condition.wait()
return heapq.heappop(self.queue).task
异步队列(异步IO)
import asyncio
from asyncio import Queue as AsyncQueue
class AsyncTaskProcessor:
def __init__(self, num_workers=4):
self.queue = AsyncQueue()
self.workers = []
self.num_workers = num_workers
async def worker(self, worker_id):
while True:
task = await self.queue.get()
try:
print(f"Worker {worker_id} processing {task}")
result = await task.execute()
print(f"Worker {worker_id} completed: {result}")
except Exception as e:
print(f"Worker {worker_id} error: {e}")
finally:
self.queue.task_done()
async def start(self):
workers = [self.worker(i) for i in range(self.num_workers)]
await asyncio.gather(*workers)
async def add_task(self, task):
await self.queue.put(task)
生产环境最佳实践
使用消息队列中间件(Redis为例)
import redis
import json
import pickle
class RedisTaskQueue:
def __init__(self, redis_host='localhost', redis_port=6379):
self.redis = redis.Redis(host=redis_host, port=redis_port)
self.queue_key = 'task_queue'
self.result_key = 'task_results'
def add_task(self, task):
"""序列化任务并添加到Redis队列"""
task_data = {
'task_id': task.id,
'task_class': task.__class__.__name__,
'task_args': pickle.dumps(task.execute.__self__)
}
self.redis.lpush(self.queue_key, json.dumps(task_data))
def get_task(self, timeout=0):
"""从队列获取任务"""
task_data = self.redis.brpop(self.queue_key, timeout=timeout)
if task_data:
return json.loads(task_data[1])
return None
def store_result(self, task_id, result):
"""存储任务结果"""
self.redis.hset(self.result_key, task_id, json.dumps(result))
def get_result(self, task_id):
"""获取任务结果"""
result = self.redis.hget(self.result_key, task_id)
if result:
return json.loads(result)
return None
错误处理与重试机制
class RetryTaskQueue:
def __init__(self, max_retries=3, retry_delay=5):
self.queue = Queue()
self.retry_count = {}
self.max_retries = max_retries
self.retry_delay = retry_delay
def process_with_retry(self, task):
"""带重试的任务处理"""
for attempt in range(self.max_retries):
try:
result = task.execute()
return result
except Exception as e:
if attempt < self.max_retries - 1:
# 记录重试次数
self.retry_count[task.id] = attempt + 1
logging.warning(f"Task {task.id} failed, retrying {attempt + 1}")
time.sleep(self.retry_delay * (attempt + 1)) # 指数退避
else:
# 最终失败处理
logging.error(f"Task {task.id} failed after {self.max_retries} attempts")
raise
监控与统计
from collections import defaultdict
import threading
import time
class MonitoredQueue:
def __init__(self):
self.queue = Queue()
self.stats = defaultdict(int)
self.start_time = time.time()
self.lock = threading.Lock()
def add_task(self, task):
with self.lock:
self.stats['total_added'] += 1
self.queue.put(task)
def task_completed(self, duration):
with self.lock:
self.stats['total_completed'] += 1
self.stats['total_time'] += duration
self.stats['avg_time'] = self.stats['total_time'] / self.stats['total_completed']
def get_stats(self):
with self.lock:
return {
'queue_size': self.queue.qsize(),
'total_added': self.stats['total_added'],
'total_completed': self.stats['total_completed'],
'avg_time': self.stats['avg_time'],
'uptime': time.time() - self.start_time
}
完整使用示例
def main():
# 初始化队列
task_queue = TaskQueue(num_workers=3)
task_queue.start_workers()
# 添加任务
tasks = [
ImageProcessTask("photo1.jpg", ["resize", "watermark"]),
EmailTask("user@example.com", "Welcome", "Hello!"),
ImageProcessTask("photo2.jpg", ["compress", "optimize"]),
]
for task in tasks:
task_queue.add_task(task)
print(f"Added task: {task}")
# 获取结果
for _ in range(len(tasks)):
result = task_queue.get_result(timeout=10)
print(f"Result: {result}")
# 关闭队列
task_queue.is_running = False
if __name__ == "__main__":
main()
关键考虑点
- 线程安全:使用线程安全的队列操作
- 资源管理:控制工作线程数量,避免资源耗尽
- 错误处理:实现完善的错误处理和重试机制
- 监控告警:监控队列长度、处理延迟等指标
- 序列化:使用JSON或pickle进行任务序列化
- 持久化:重要任务需要考虑持久化存储
队列处理耗时任务的核心是解耦任务提交和任务执行,提高系统的响应性和可扩展性。