如何使用队列处理耗时任务?

wen PHP项目 41

本文目录导读:

如何使用队列处理耗时任务?

  1. 队列的基本架构
  2. 任务定义与实现
  3. 高级队列实现
  4. 生产环境最佳实践
  5. 错误处理与重试机制
  6. 监控与统计
  7. 完整使用示例
  8. 关键考虑点

我来详细说明如何使用队列处理耗时任务的最佳实践。

队列的基本架构

from queue import Queue
from threading import Thread
import time
import logging
class TaskQueue:
    def __init__(self, num_workers=4):
        self.task_queue = Queue()
        self.result_queue = Queue()
        self.workers = []
        self.num_workers = num_workers
        self.is_running = True
    def start_workers(self):
        """启动工作线程"""
        for i in range(self.num_workers):
            worker = Thread(target=self._worker_loop, name=f"Worker-{i}")
            worker.daemon = True
            worker.start()
            self.workers.append(worker)
    def _worker_loop(self):
        """工作线程主循环"""
        while self.is_running:
            try:
                # 获取任务,设置超时避免阻塞
                task = self.task_queue.get(timeout=1)
                try:
                    # 执行任务
                    result = task.execute()
                    self.result_queue.put({
                        'task_id': task.id,
                        'status': 'success',
                        'result': result
                    })
                except Exception as e:
                    self.result_queue.put({
                        'task_id': task.id,
                        'status': 'failed',
                        'error': str(e)
                    })
                finally:
                    self.task_queue.task_done()
            except:
                continue
    def add_task(self, task):
        """添加任务到队列"""
        self.task_queue.put(task)
    def get_result(self, timeout=None):
        """获取任务结果"""
        return self.result_queue.get(timeout=timeout)

任务定义与实现

from abc import ABC, abstractmethod
import uuid
import time
class BaseTask(ABC):
    """任务基类"""
    def __init__(self):
        self.id = str(uuid.uuid4())
        self.created_at = time.time()
        self.priority = 0
    @abstractmethod
    def execute(self):
        """执行任务"""
        pass
    def __str__(self):
        return f"Task({self.id})"
class ImageProcessTask(BaseTask):
    """图片处理任务"""
    def __init__(self, image_path, operations):
        super().__init__()
        self.image_path = image_path
        self.operations = operations
    def execute(self):
        # 模拟耗时任务
        time.sleep(2)
        return f"Processed {self.image_path} with {self.operations}"
class EmailTask(BaseTask):
    """发送邮件任务"""
    def __init__(self, to, subject, body):
        super().__init__()
        self.to = to
        self.subject = subject
        self.body = body
    def execute(self):
        time.sleep(1)
        return f"Email sent to {self.to}"

高级队列实现

带优先级的队列

import heapq
from dataclasses import dataclass, field
from typing import Any
@dataclass(order=True)
class PrioritizedItem:
    priority: int
    task: Any=field(compare=False)
class PriorityTaskQueue:
    def __init__(self, num_workers=4):
        self.queue = []
        self.result_queue = Queue()
        self.workers = []
        self.num_workers = num_workers
        self.lock = threading.Lock()
        self.condition = threading.Condition()
    def add_task(self, task, priority=0):
        with self.condition:
            heapq.heappush(self.queue, PrioritizedItem(priority, task))
            self.condition.notify()
    def get_task(self):
        with self.condition:
            while len(self.queue) == 0:
                self.condition.wait()
            return heapq.heappop(self.queue).task

异步队列(异步IO)

import asyncio
from asyncio import Queue as AsyncQueue
class AsyncTaskProcessor:
    def __init__(self, num_workers=4):
        self.queue = AsyncQueue()
        self.workers = []
        self.num_workers = num_workers
    async def worker(self, worker_id):
        while True:
            task = await self.queue.get()
            try:
                print(f"Worker {worker_id} processing {task}")
                result = await task.execute()
                print(f"Worker {worker_id} completed: {result}")
            except Exception as e:
                print(f"Worker {worker_id} error: {e}")
            finally:
                self.queue.task_done()
    async def start(self):
        workers = [self.worker(i) for i in range(self.num_workers)]
        await asyncio.gather(*workers)
    async def add_task(self, task):
        await self.queue.put(task)

生产环境最佳实践

使用消息队列中间件(Redis为例)

import redis
import json
import pickle
class RedisTaskQueue:
    def __init__(self, redis_host='localhost', redis_port=6379):
        self.redis = redis.Redis(host=redis_host, port=redis_port)
        self.queue_key = 'task_queue'
        self.result_key = 'task_results'
    def add_task(self, task):
        """序列化任务并添加到Redis队列"""
        task_data = {
            'task_id': task.id,
            'task_class': task.__class__.__name__,
            'task_args': pickle.dumps(task.execute.__self__)
        }
        self.redis.lpush(self.queue_key, json.dumps(task_data))
    def get_task(self, timeout=0):
        """从队列获取任务"""
        task_data = self.redis.brpop(self.queue_key, timeout=timeout)
        if task_data:
            return json.loads(task_data[1])
        return None
    def store_result(self, task_id, result):
        """存储任务结果"""
        self.redis.hset(self.result_key, task_id, json.dumps(result))
    def get_result(self, task_id):
        """获取任务结果"""
        result = self.redis.hget(self.result_key, task_id)
        if result:
            return json.loads(result)
        return None

错误处理与重试机制

class RetryTaskQueue:
    def __init__(self, max_retries=3, retry_delay=5):
        self.queue = Queue()
        self.retry_count = {}
        self.max_retries = max_retries
        self.retry_delay = retry_delay
    def process_with_retry(self, task):
        """带重试的任务处理"""
        for attempt in range(self.max_retries):
            try:
                result = task.execute()
                return result
            except Exception as e:
                if attempt < self.max_retries - 1:
                    # 记录重试次数
                    self.retry_count[task.id] = attempt + 1
                    logging.warning(f"Task {task.id} failed, retrying {attempt + 1}")
                    time.sleep(self.retry_delay * (attempt + 1))  # 指数退避
                else:
                    # 最终失败处理
                    logging.error(f"Task {task.id} failed after {self.max_retries} attempts")
                    raise

监控与统计

from collections import defaultdict
import threading
import time
class MonitoredQueue:
    def __init__(self):
        self.queue = Queue()
        self.stats = defaultdict(int)
        self.start_time = time.time()
        self.lock = threading.Lock()
    def add_task(self, task):
        with self.lock:
            self.stats['total_added'] += 1
        self.queue.put(task)
    def task_completed(self, duration):
        with self.lock:
            self.stats['total_completed'] += 1
            self.stats['total_time'] += duration
            self.stats['avg_time'] = self.stats['total_time'] / self.stats['total_completed']
    def get_stats(self):
        with self.lock:
            return {
                'queue_size': self.queue.qsize(),
                'total_added': self.stats['total_added'],
                'total_completed': self.stats['total_completed'],
                'avg_time': self.stats['avg_time'],
                'uptime': time.time() - self.start_time
            }

完整使用示例

def main():
    # 初始化队列
    task_queue = TaskQueue(num_workers=3)
    task_queue.start_workers()
    # 添加任务
    tasks = [
        ImageProcessTask("photo1.jpg", ["resize", "watermark"]),
        EmailTask("user@example.com", "Welcome", "Hello!"),
        ImageProcessTask("photo2.jpg", ["compress", "optimize"]),
    ]
    for task in tasks:
        task_queue.add_task(task)
        print(f"Added task: {task}")
    # 获取结果
    for _ in range(len(tasks)):
        result = task_queue.get_result(timeout=10)
        print(f"Result: {result}")
    # 关闭队列
    task_queue.is_running = False
if __name__ == "__main__":
    main()

关键考虑点

  1. 线程安全:使用线程安全的队列操作
  2. 资源管理:控制工作线程数量,避免资源耗尽
  3. 错误处理:实现完善的错误处理和重试机制
  4. 监控告警:监控队列长度、处理延迟等指标
  5. 序列化:使用JSON或pickle进行任务序列化
  6. 持久化:重要任务需要考虑持久化存储

队列处理耗时任务的核心是解耦任务提交和任务执行,提高系统的响应性和可扩展性。

抱歉,评论功能暂时关闭!