如何用令牌桶算法实现一个本地限流器?

wen java案例 58

本文目录导读:

如何用令牌桶算法实现一个本地限流器?

  1. 核心原理
  2. Java 实现
  3. 优化版本:基于队列的时间戳实现
  4. 使用注意事项

使用令牌桶算法实现本地限流器,可以通过以下几个关键步骤完成,这里给出一个 Java 实现的完整示例,并附上核心思路说明。

核心原理

令牌桶算法的核心思想:

  • 以固定速率向桶中添加令牌
  • 每个请求消耗一个令牌
  • 桶有上限容量,防止突发流量过大
  • 如果桶中无可用令牌,请求被拒绝或等待

Java 实现

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class TokenBucketRateLimiter {
    // 桶的最大容量
    private final long capacity;
    // 令牌生成速率(每秒生成令牌数)
    private final double tokensPerSecond;
    // 当前桶中令牌数量
    private double currentTokens;
    // 上次令牌补充时间(纳秒)
    private long lastRefillTime;
    // 并发控制锁
    private final Lock lock = new ReentrantLock();
    /**
     * 构造函数
     * @param capacity 桶容量
     * @param tokensPerSecond 每秒生成令牌数
     */
    public TokenBucketRateLimiter(long capacity, double tokensPerSecond) {
        this.capacity = capacity;
        this.tokensPerSecond = tokensPerSecond;
        this.currentTokens = capacity; // 初始满桶
        this.lastRefillTime = System.nanoTime();
    }
    /**
     * 尝试获取一个令牌(立即返回)
     * @return 是否获取成功
     */
    public boolean tryAcquire() {
        return tryAcquire(1);
    }
    /**
     * 尝试获取指定数量的令牌(立即返回)
     * @param tokens 需要的令牌数
     * @return 是否获取成功
     */
    public boolean tryAcquire(int tokens) {
        lock.lock();
        try {
            refillTokens();
            if (currentTokens >= tokens) {
                currentTokens -= tokens;
                return true;
            }
            return false;
        } finally {
            lock.unlock();
        }
    }
    /**
     * 获取一个令牌(等待直到获取)
     * @throws InterruptedException 如果等待时被中断
     */
    public void acquire() throws InterruptedException {
        acquire(1);
    }
    /**
     * 获取指定数量的令牌(等待直到获取)
     * @param tokens 需要的令牌数
     * @throws InterruptedException 如果等待时被中断
     */
    public void acquire(int tokens) throws InterruptedException {
        long waitTime = reserve(tokens);
        if (waitTime > 0) {
            Thread.sleep(waitTime);
        }
    }
    /**
     * 预留令牌,返回需要等待的时间
     */
    private long reserve(int tokens) {
        lock.lock();
        try {
            refillTokens();
            // 计算当前令牌是否足够
            if (currentTokens >= tokens) {
                currentTokens -= tokens;
                return 0;
            }
            // 计算需要等待的时间(毫秒)
            // 缺少的令牌数
            double deficit = tokens - currentTokens;
            // 需要的等待时间(毫秒)
            long waitTimeMs = (long)(deficit / tokensPerSecond * 1000);
            // 当前使用,但队列中等待(这里是预扣除)
            currentTokens = -deficit; // 表示已经欠费
            return waitTimeMs;
        } finally {
            lock.unlock();
        }
    }
    /**
     * 补充令牌
     */
    private void refillTokens() {
        long now = System.nanoTime();
        // 计算时间差(秒)
        double elapsedSeconds = (now - lastRefillTime) / 1_000_000_000.0;
        // 计算应该生成的令牌数
        double newTokens = elapsedSeconds * tokensPerSecond;
        // 更新令牌数量(不超过容量)
        currentTokens = Math.min(capacity, currentTokens + newTokens);
        lastRefillTime = now;
    }
    /**
     * 获取当前桶中令牌数(近似值)
     */
    public double getCurrentTokens() {
        lock.lock();
        try {
            refillTokens();
            return currentTokens;
        } finally {
            lock.unlock();
        }
    }
    /**
     * 重置限流器
     */
    public void reset() {
        lock.lock();
        try {
            currentTokens = capacity;
            lastRefillTime = System.nanoTime();
        } finally {
            lock.unlock();
        }
    }
    // 使用示例
    public static void main(String[] args) throws InterruptedException {
        // 创建限流器:桶容量5,每秒生成2个令牌
        TokenBucketRateLimiter limiter = new TokenBucketRateLimiter(5, 2);
        // 测试非阻塞获取
        for (int i = 0; i < 10; i++) {
            boolean acquired = limiter.tryAcquire();
            System.out.println("请求 " + i + ": " + (acquired ? "通过" : "被限流") 
                             + " (当前令牌数: " + limiter.getCurrentTokens() + ")");
            Thread.sleep(200);
        }
        System.out.println("\n等待2秒后...");
        Thread.sleep(2000);
        // 测试阻塞获取
        System.out.println("尝试阻塞获取2个令牌...");
        long start = System.currentTimeMillis();
        limiter.acquire(2);
        long end = System.currentTimeMillis();
        System.out.println("获取成功,等待时间: " + (end - start) + "ms");
    }
}

优化版本:基于队列的时间戳实现

对于高并发场景,上述实现中 synchronizedReentrantLock 可能成为瓶颈,以下是基于时间戳队列的高性能实现:

import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicLong;
public class HighPerformanceTokenBucket {
    // 每个令牌生成的最小时间间隔(纳秒)
    private final long intervalNanos;
    // 桶容量
    private final long capacity;
    // 上次检查时间(用于非精准但性能更好的计算)
    private final AtomicLong lastCheckNanos;
    // 当前可用令牌的近似值(仅用于快速判断,不保证精确)
    private volatile double availableTokens;
    // 精确模式:使用队列记录每个令牌的时间
    private final boolean preciseMode;
    private final ConcurrentLinkedQueue<Long> tokenTimes;
    public HighPerformanceTokenBucket(long capacity, double tokensPerSecond) {
        this.capacity = capacity;
        this.intervalNanos = (long)(1_000_000_000L / tokensPerSecond);
        this.lastCheckNanos = new AtomicLong(System.nanoTime());
        this.availableTokens = capacity;
        this.preciseMode = false; // 默认非精确模式
        this.tokenTimes = null;
    }
    /**
     * 高性能非阻塞获取(近似实现)
     */
    public boolean tryAcquire() {
        while (true) {
            long now = System.nanoTime();
            long lastCheck = lastCheckNanos.get();
            // 计算自上次检查后生成的令牌数
            long elapsed = now - lastCheck;
            double newTokens = (double) elapsed / intervalNanos;
            // 更新状态(CAS)
            if (lastCheckNanos.compareAndSet(lastCheck, now)) {
                double tokens = Math.min(capacity, availableTokens + newTokens);
                if (tokens >= 1) {
                    availableTokens = tokens - 1;
                    return true;
                } else {
                    availableTokens = tokens;
                    return false;
                }
            }
            // CAS失败,重试
            Thread.yield();
        }
    }
}

使用注意事项

  1. 线程安全:必须使用锁或CAS操作保证并发安全

  2. 时间精度:使用 System.nanoTime() 而非 System.currentTimeMillis(),避免系统时间调整带来的问题

  3. 性能考虑

    • 低并发场景:使用锁实现即可
    • 高并发场景:考虑CAS或分段锁
  4. 实际应用建议

    • 可以集成到 FilterInterceptor 中实现全局限流
    • 结合 Guava RateLimiter 等成熟库使用
    • 注意设置合理的桶容量和速率参数
  5. 扩展功能

    • 可以加入预热模式,让限流器逐渐达到最大速率
    • 可以支持不同优先级的请求

这个实现足够应对大多数本地限流场景,如果需要在分布式环境中使用,则需要考虑使用 Redis 等中间件实现分布式限流。

抱歉,评论功能暂时关闭!