如何用Java案例实现任务队列?

wen java案例 2

如何用Java案例实现任务队列?

目录导读

  1. 任务队列的核心场景与价值
  2. 任务队列的底层原理与数据结构选择
  3. 基础案例:基于BlockingQueue的线程安全任务队列
  4. 进阶案例:支持优先级与延迟执行的复杂队列
  5. 性能优化:拒绝策略与动态扩缩容
  6. 高频问答与实战避坑

任务队列的核心场景与价值

任务队列是一种“生产者-消费者”模式的经典实现,用于解耦任务的提交执行,在Java应用中,常见场景包括:

如何用Java案例实现任务队列?

  • 异步处理(如邮件发送、日志写入)
  • 请求削峰(秒杀系统、API限流)
  • 后台批处理(数据清洗、报告生成)

核心价值

  • 提升系统吞吐量(允许任务批量处理)
  • 降低响应延迟(主线程无需等待任务完成)
  • 增强系统可靠性(队列作为缓冲,可抗瞬时流量)

任务队列的底层原理与数据结构选择

1 生产者-消费者模式

任务队列本质是一个线程安全的容器:

  • 生产者线程负责将任务对象放入队列(offer()put()
  • 消费者线程从队列取出任务并执行(poll()take()

2 常用数据结构对比

数据结构 线程安全 特性 适用场景
ArrayBlockingQueue 有界队列,基于数组 内存可控的固定容量场景
LinkedBlockingQueue 可选有界/无界,基于链表 高吞吐但需控制内存
PriorityBlockingQueue 支持优先级排序 任务有紧急度差异
DelayQueue 元素需实现Delayed接口 延迟执行、超时重试

选择建议:生产环境首选LinkedBlockingQueue(兼顾性能与容量灵活性);需要定时任务时用ScheduledThreadPoolExecutor作为包装。


基础案例:基于BlockingQueue的线程安全任务队列

1 核心代码示例

import java.util.concurrent.*;
public class SimpleTaskQueue {
    // 定义有界任务队列,容量100
    private final BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(100);
    // 定义消费者线程池(2个核心线程,最大4个,保活30秒)
    private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
            2, 4, 30L, TimeUnit.SECONDS, taskQueue);
    // 提交任务
    public void submitTask(Runnable task) {
        try {
            // 非阻塞入队,若队列满则抛出IllegalStateException
            taskQueue.offer(task, 5, TimeUnit.SECONDS); 
            System.out.println("任务已加入队列,当前队列大小: " + taskQueue.size());
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            System.out.println("任务提交失败");
        }
    }
    // 启动消费者(已在线程池中自动处理)
    public SimpleTaskQueue() {
        // 线程池已包含消费逻辑
        System.out.println("任务队列系统已启动");
    }
}

2 单元测试验证

public static void main(String[] args) {
    SimpleTaskQueue queue = new SimpleTaskQueue();
    for (int i = 0; i < 10; i++) {
        final int taskId = i;
        queue.submitTask(() -> {
            try { Thread.sleep(500); } catch (InterruptedException e) {}
            System.out.println(Thread.currentThread().getName() + " 完成任务:" + taskId);
        });
    }
}

输出示例

任务队列系统已启动  
任务已加入队列,当前队列大小: 1  
...  
pool-1-thread-1 完成任务:0  
pool-1-thread-2 完成任务:1  

注意:实际开发中建议将executor声明为private final,并通过依赖注入管理池大小参数。


进阶案例:支持优先级与延迟执行的复杂队列

1 需求分析

某订单系统需实现:

  • 普通订单30秒内处理
  • VIP订单立即处理
  • 超时未支付的订单延迟10秒后重试

2 使用DelayQueue+优先级组合

public class DelayedTask implements Delayed, Runnable {
    private final long startTime;  // 时间戳(毫秒)
    private final int priority;    // 0=普通, 1=VIP
    private final String taskName;
    public DelayedTask(long delayMs, int priority, String name) {
        this.startTime = System.currentTimeMillis() + delayMs;
        this.priority = priority;
        this.taskName = name;
    }
    @Override
    public long getDelay(TimeUnit unit) {
        long diff = startTime - System.currentTimeMillis();
        return unit.convert(diff, TimeUnit.MILLISECONDS);
    }
    @Override
    public int compareTo(Delayed o) {
        DelayedTask other = (DelayedTask) o;
        // 优先级高的先执行,相同优先级按延迟时间排序
        if (this.priority != other.priority) {
            return other.priority - this.priority; // VIP(1) > 普通(0)
        }
        return Long.compare(this.startTime, other.startTime);
    }
    @Override
    public void run() {
        System.out.println("执行任务: " + taskName + " (优先级" + priority + ")");
    }
}

3 消费者线程处理

DelayQueue<DelayedTask> delayQueue = new DelayQueue<>();
// 生产者:订单创建后加入队列
delayQueue.put(new DelayedTask(0, 1, "VIP订单A"));    // VIP立即执行
delayQueue.put(new DelayedTask(30000, 0, "普通订单B")); // 普通延迟30秒
// 消费者:轮询取出到期任务
new Thread(() -> {
    while (true) {
        try {
            DelayedTask task = delayQueue.take(); // 阻塞直到到期
            task.run();
        } catch (InterruptedException e) {
            break;
        }
    }
}).start();

运行效果:VIP订单在提交后立即执行,普通订单等待30秒后执行。


性能优化:拒绝策略与动态扩缩容

1 常见的拒绝策略(当队列满且线程数达最大时)

策略 行为
AbortPolicy (默认) 抛出RejectedExecutionException,需调用方自行处理
CallerRunsPolicy 由提交任务的线程直接执行(降低生产者速度)
DiscardPolicy 静默丢弃最新任务(适合不重要的日志)
DiscardOldestPolicy 丢弃队列头部的等待任务(可接受短暂数据丢失的场景)

推荐组合LinkedBlockingQueue + CallerRunsPolicy,避免客户端线程无限阻塞。

2 动态扩缩容实现

public class DynamicQueue {
    private final ThreadPoolExecutor executor = 
        new ThreadPoolExecutor(2, 10, 60L, TimeUnit.SECONDS, 
            new LinkedBlockingQueue<>(200));
    public void adjustPoolSize(int core, int max) {
        executor.setCorePoolSize(core);
        executor.setMaximumPoolSize(max);
    }
}

监控指标:通过executor.getQueue().size()executor.getActiveCount()判断是否需要扩容。


高频问答与实战避坑

问1:任务队列与线程池的区别是什么?

  • 任务队列侧重数据缓冲(存储任务)
  • 线程池侧重执行管理(复用线程)
  • 生产环境:将队列作为线程池的任务存储容器,二者结合使用。

问2:任务队列会不会导致内存溢出?

  • 必须使用有界队列(如LinkedBlockingQueue(MAX)
  • 配合offer(timeout)进行超时等待,拒绝时走预设策略

问3:如何保证任务不丢失?

  • 持久化:队列中的任务同时写入数据库(如使用RabbitMQ
  • 确认机制:任务执行成功后再从队列移除(避免宕机丢失)
  1. 慎用无界队列:可能无限膨胀导致OOM。
  2. 优先级队列需实现Comparable:否则会按插入顺序混乱执行。
  3. 延迟队列需注意时间精度System.nanoTime()优于currentTimeMillis()在高精度场景。
  4. 线程安全:使用ConcurrentHashMap存储任务状态,避免并发修改。

通过以上案例,你已经掌握了从基础的有界队列到复杂的延迟优先级队列的实现方法,根据业务规模,可进一步结合分布式任务调度框架(如Quartz、XXL-JOB)扩展至微服务环境。

抱歉,评论功能暂时关闭!