Python案例怎么实现限流防护?

wen python案例 26

本文目录导读:

Python案例怎么实现限流防护?

  1. 计数器法(简单实现)
  2. 滑动窗口算法
  3. 令牌桶算法
  4. 漏桶算法
  5. 基于Redis的分布式限流
  6. 装饰器实现(通用方案)
  7. 综合实现(Flask示例)
  8. 使用建议

我来介绍几种Python实现限流防护的常用方法:

计数器法(简单实现)

import time
from threading import Lock
class CounterLimiter:
    """基于计数器的限流器"""
    def __init__(self, max_requests=100, window_seconds=60):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.counter = 0
        self.window_start = time.time()
        self.lock = Lock()
    def allow_request(self) -> bool:
        with self.lock:
            now = time.time()
            # 重置时间窗口
            if now - self.window_start >= self.window_seconds:
                self.counter = 0
                self.window_start = now
            if self.counter < self.max_requests:
                self.counter += 1
                return True
            return False
# 使用示例
limiter = CounterLimiter(max_requests=10, window_seconds=1)
for i in range(15):
    if limiter.allow_request():
        print(f"请求 {i}: 允许")
    else:
        print(f"请求 {i}: 拒绝")

滑动窗口算法

from collections import deque
import time
from threading import Lock
class SlidingWindowLimiter:
    """滑动窗口限流器"""
    def __init__(self, max_requests=100, window_seconds=60):
        self.max_requests = max_requests
        self.window_seconds = window_seconds
        self.requests = deque()
        self.lock = Lock()
    def allow_request(self) -> bool:
        with self.lock:
            now = time.time()
            # 移除过期请求
            while self.requests and now - self.requests[0] > self.window_seconds:
                self.requests.popleft()
            # 检查是否超过限制
            if len(self.requests) < self.max_requests:
                self.requests.append(now)
                return True
            return False
# 使用示例
sliding_limiter = SlidingWindowLimiter(max_requests=5, window_seconds=10)
for i in range(10):
    if sliding_limiter.allow_request():
        print(f"请求 {i}: 通过")
    else:
        print(f"请求 {i}: 限流")
    time.sleep(0.5)

令牌桶算法

import time
from threading import Lock
class TokenBucketLimiter:
    """令牌桶限流器"""
    def __init__(self, rate=10, capacity=20):
        """
        :param rate: 令牌生成速率(每秒)
        :param capacity: 桶容量
        """
        self.rate = rate
        self.capacity = capacity
        self.tokens = capacity
        self.last_refill = time.time()
        self.lock = Lock()
    def _refill(self):
        """补充令牌"""
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
        self.last_refill = now
    def allow_request(self, tokens_needed=1) -> bool:
        with self.lock:
            self._refill()
            if self.tokens >= tokens_needed:
                self.tokens -= tokens_needed
                return True
            return False
# 使用示例
token_limiter = TokenBucketLimiter(rate=5, capacity=10)
for i in range(15):
    if token_limiter.allow_request():
        print(f"请求 {i}: 通过")
    else:
        print(f"请求 {i}: 限流")
    time.sleep(0.1)

漏桶算法

import time
from threading import Lock
class LeakyBucketLimiter:
    """漏桶限流器"""
    def __init__(self, rate=10, capacity=20):
        """
        :param rate: 漏出速率(每秒)
        :param capacity: 桶容量
        """
        self.rate = rate
        self.capacity = capacity
        self.water = 0
        self.last_leak = time.time()
        self.lock = Lock()
    def _leak(self):
        """漏水"""
        now = time.time()
        elapsed = now - self.last_leak
        leaked = elapsed * self.rate
        self.water = max(0, self.water - leaked)
        self.last_leak = now
    def allow_request(self) -> bool:
        with self.lock:
            self._leak()
            if self.water < self.capacity:
                self.water += 1
                return True
            return False
# 使用示例
leaky_limiter = LeakyBucketLimiter(rate=3, capacity=5)
for i in range(10):
    if leaky_limiter.allow_request():
        print(f"请求 {i}: 通过")
    else:
        print(f"请求 {i}: 限流")
    time.sleep(0.2)

基于Redis的分布式限流

import redis
import time
class RedisSlidingWindowLimiter:
    """基于Redis的滑动窗口限流"""
    def __init__(self, redis_client, key="rate_limit", max_requests=100, window_seconds=60):
        self.redis = redis_client
        self.key = key
        self.max_requests = max_requests
        self.window_seconds = window_seconds
    def allow_request(self, user_id="default") -> bool:
        current_time = int(time.time())
        window_key = f"{self.key}:{user_id}"
        # 使用Redis事务保证原子性
        pipe = self.redis.pipeline()
        pipe.zadd(window_key, {current_time: current_time})
        pipe.zremrangebyscore(window_key, 0, current_time - self.window_seconds)
        pipe.zcard(window_key)
        pipe.expire(window_key, self.window_seconds + 1)
        _, _, request_count, _ = pipe.execute()
        return request_count <= self.max_requests
# Redis配置
redis_client = redis.Redis(host='localhost', port=6379, db=0)
redis_limiter = RedisSlidingWindowLimiter(redis_client, max_requests=10, window_seconds=1)
# 使用示例
for i in range(15):
    if redis_limiter.allow_request("user123"):
        print(f"请求 {i}: 通过")
    else:
        print(f"请求 {i}: 限流")

装饰器实现(通用方案)

from functools import wraps
import time
from threading import Lock
def rate_limiter(max_requests=10, window_seconds=1):
    """限流装饰器"""
    limiter = CounterLimiter(max_requests, window_seconds)
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            if not limiter.allow_request():
                raise Exception("请求过于频繁,请稍后再试")
            return func(*args, **kwargs)
        return wrapper
    return decorator
# 使用示例
@rate_limiter(max_requests=5, window_seconds=10)
def api_endpoint():
    return "API响应成功"
# 测试
for i in range(10):
    try:
        result = api_endpoint()
        print(f"请求 {i}: {result}")
    except Exception as e:
        print(f"请求 {i}: {e}")

综合实现(Flask示例)

from flask import Flask, request, jsonify
from functools import wraps
import time
from threading import Lock
app = Flask(__name__)
# 存储用户IP的请求记录
request_records = {}
lock = Lock()
def ip_limiter(max_requests=100, window_seconds=60):
    """基于IP的限流"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            ip = request.remote_addr
            current_time = time.time()
            with lock:
                if ip not in request_records:
                    request_records[ip] = []
                # 清理过期记录
                request_records[ip] = [
                    req_time for req_time in request_records[ip]
                    if current_time - req_time <= window_seconds
                ]
                # 检查是否超限
                if len(request_records[ip]) >= max_requests:
                    return jsonify({
                        "error": "请求过于频繁",
                        "retry_after": window_seconds
                    }), 429
                request_records[ip].append(current_time)
            return func(*args, **kwargs)
        return wrapper
    return decorator
@app.route('/api/data')
@ip_limiter(max_requests=5, window_seconds=10)
def get_data():
    return jsonify({"data": "success"})
if __name__ == '__main__':
    app.run(debug=True)

使用建议

  1. 选择算法

    • 简单场景:计数器法
    • 高精度:滑动窗口
    • 平滑流量:令牌桶/漏桶
    • 分布式系统:Redis方案
  2. 配置参数

    • 根据业务需求设置合理的阈值
    • 考虑峰值流量
  3. 错误处理

    • 返回合适的HTTP状态码(如429)
    • 提供重试时间信息
  4. 监控告警

    • 记录限流日志
    • 监控指标统计

根据具体业务场景选择合适的限流策略,可以有效保护系统不被过载。

抱歉,评论功能暂时关闭!