本文目录导读:

我来介绍几种Python实现限流防护的常用方法:
计数器法(简单实现)
import time
from threading import Lock
class CounterLimiter:
"""基于计数器的限流器"""
def __init__(self, max_requests=100, window_seconds=60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.counter = 0
self.window_start = time.time()
self.lock = Lock()
def allow_request(self) -> bool:
with self.lock:
now = time.time()
# 重置时间窗口
if now - self.window_start >= self.window_seconds:
self.counter = 0
self.window_start = now
if self.counter < self.max_requests:
self.counter += 1
return True
return False
# 使用示例
limiter = CounterLimiter(max_requests=10, window_seconds=1)
for i in range(15):
if limiter.allow_request():
print(f"请求 {i}: 允许")
else:
print(f"请求 {i}: 拒绝")
滑动窗口算法
from collections import deque
import time
from threading import Lock
class SlidingWindowLimiter:
"""滑动窗口限流器"""
def __init__(self, max_requests=100, window_seconds=60):
self.max_requests = max_requests
self.window_seconds = window_seconds
self.requests = deque()
self.lock = Lock()
def allow_request(self) -> bool:
with self.lock:
now = time.time()
# 移除过期请求
while self.requests and now - self.requests[0] > self.window_seconds:
self.requests.popleft()
# 检查是否超过限制
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
return False
# 使用示例
sliding_limiter = SlidingWindowLimiter(max_requests=5, window_seconds=10)
for i in range(10):
if sliding_limiter.allow_request():
print(f"请求 {i}: 通过")
else:
print(f"请求 {i}: 限流")
time.sleep(0.5)
令牌桶算法
import time
from threading import Lock
class TokenBucketLimiter:
"""令牌桶限流器"""
def __init__(self, rate=10, capacity=20):
"""
:param rate: 令牌生成速率(每秒)
:param capacity: 桶容量
"""
self.rate = rate
self.capacity = capacity
self.tokens = capacity
self.last_refill = time.time()
self.lock = Lock()
def _refill(self):
"""补充令牌"""
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.rate)
self.last_refill = now
def allow_request(self, tokens_needed=1) -> bool:
with self.lock:
self._refill()
if self.tokens >= tokens_needed:
self.tokens -= tokens_needed
return True
return False
# 使用示例
token_limiter = TokenBucketLimiter(rate=5, capacity=10)
for i in range(15):
if token_limiter.allow_request():
print(f"请求 {i}: 通过")
else:
print(f"请求 {i}: 限流")
time.sleep(0.1)
漏桶算法
import time
from threading import Lock
class LeakyBucketLimiter:
"""漏桶限流器"""
def __init__(self, rate=10, capacity=20):
"""
:param rate: 漏出速率(每秒)
:param capacity: 桶容量
"""
self.rate = rate
self.capacity = capacity
self.water = 0
self.last_leak = time.time()
self.lock = Lock()
def _leak(self):
"""漏水"""
now = time.time()
elapsed = now - self.last_leak
leaked = elapsed * self.rate
self.water = max(0, self.water - leaked)
self.last_leak = now
def allow_request(self) -> bool:
with self.lock:
self._leak()
if self.water < self.capacity:
self.water += 1
return True
return False
# 使用示例
leaky_limiter = LeakyBucketLimiter(rate=3, capacity=5)
for i in range(10):
if leaky_limiter.allow_request():
print(f"请求 {i}: 通过")
else:
print(f"请求 {i}: 限流")
time.sleep(0.2)
基于Redis的分布式限流
import redis
import time
class RedisSlidingWindowLimiter:
"""基于Redis的滑动窗口限流"""
def __init__(self, redis_client, key="rate_limit", max_requests=100, window_seconds=60):
self.redis = redis_client
self.key = key
self.max_requests = max_requests
self.window_seconds = window_seconds
def allow_request(self, user_id="default") -> bool:
current_time = int(time.time())
window_key = f"{self.key}:{user_id}"
# 使用Redis事务保证原子性
pipe = self.redis.pipeline()
pipe.zadd(window_key, {current_time: current_time})
pipe.zremrangebyscore(window_key, 0, current_time - self.window_seconds)
pipe.zcard(window_key)
pipe.expire(window_key, self.window_seconds + 1)
_, _, request_count, _ = pipe.execute()
return request_count <= self.max_requests
# Redis配置
redis_client = redis.Redis(host='localhost', port=6379, db=0)
redis_limiter = RedisSlidingWindowLimiter(redis_client, max_requests=10, window_seconds=1)
# 使用示例
for i in range(15):
if redis_limiter.allow_request("user123"):
print(f"请求 {i}: 通过")
else:
print(f"请求 {i}: 限流")
装饰器实现(通用方案)
from functools import wraps
import time
from threading import Lock
def rate_limiter(max_requests=10, window_seconds=1):
"""限流装饰器"""
limiter = CounterLimiter(max_requests, window_seconds)
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
if not limiter.allow_request():
raise Exception("请求过于频繁,请稍后再试")
return func(*args, **kwargs)
return wrapper
return decorator
# 使用示例
@rate_limiter(max_requests=5, window_seconds=10)
def api_endpoint():
return "API响应成功"
# 测试
for i in range(10):
try:
result = api_endpoint()
print(f"请求 {i}: {result}")
except Exception as e:
print(f"请求 {i}: {e}")
综合实现(Flask示例)
from flask import Flask, request, jsonify
from functools import wraps
import time
from threading import Lock
app = Flask(__name__)
# 存储用户IP的请求记录
request_records = {}
lock = Lock()
def ip_limiter(max_requests=100, window_seconds=60):
"""基于IP的限流"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
ip = request.remote_addr
current_time = time.time()
with lock:
if ip not in request_records:
request_records[ip] = []
# 清理过期记录
request_records[ip] = [
req_time for req_time in request_records[ip]
if current_time - req_time <= window_seconds
]
# 检查是否超限
if len(request_records[ip]) >= max_requests:
return jsonify({
"error": "请求过于频繁",
"retry_after": window_seconds
}), 429
request_records[ip].append(current_time)
return func(*args, **kwargs)
return wrapper
return decorator
@app.route('/api/data')
@ip_limiter(max_requests=5, window_seconds=10)
def get_data():
return jsonify({"data": "success"})
if __name__ == '__main__':
app.run(debug=True)
使用建议
-
选择算法:
- 简单场景:计数器法
- 高精度:滑动窗口
- 平滑流量:令牌桶/漏桶
- 分布式系统:Redis方案
-
配置参数:
- 根据业务需求设置合理的阈值
- 考虑峰值流量
-
错误处理:
- 返回合适的HTTP状态码(如429)
- 提供重试时间信息
-
监控告警:
- 记录限流日志
- 监控指标统计
根据具体业务场景选择合适的限流策略,可以有效保护系统不被过载。