本文目录导读:

防止接口重复提交通常有多种实现方案,我为您介绍几种常见且实用的Python实现方法。
基于Redis的分布式锁方案(最常用)
import redis
import time
import uuid
from functools import wraps
from flask import Flask, request, jsonify
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def prevent_duplicate_submit(timeout=3):
"""
防止重复提交装饰器
:param timeout: 锁定时间(秒)
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成唯一标识(基于用户ID和接口路径)
user_id = request.headers.get('X-User-Id', 'anonymous')
api_path = request.path
lock_key = f"lock:{user_id}:{api_path}"
request_id = str(uuid.uuid4())
# 尝试获取锁
if redis_client.setnx(lock_key, request_id):
# 设置过期时间
redis_client.expire(lock_key, timeout)
try:
return func(*args, **kwargs)
finally:
# 使用Lua脚本保证原子性删除
script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
redis_client.eval(script, 1, lock_key, request_id)
else:
return jsonify({
'code': 429,
'message': '请求正在处理中,请勿重复提交',
'data': None
}), 429
return wrapper
return decorator
@app.route('/api/submit', methods=['POST'])
@prevent_duplicate_submit(timeout=5)
def submit_order():
# 业务逻辑
return jsonify({'code': 200, 'message': 'success'})
基于Token令牌机制
import hashlib
import time
from flask import Flask, request, jsonify, session
app = Flask(__name__)
app.secret_key = 'your-secret-key'
class TokenManager:
"""Token管理器"""
@staticmethod
def generate_token():
"""生成请求令牌"""
# 生成基于时间的唯一令牌
timestamp = str(int(time.time() * 1000))
random_str = str(uuid.uuid4())
token_str = f"{timestamp}:{random_str}"
return hashlib.md5(token_str.encode()).hexdigest()
@staticmethod
def validate_token(token):
"""验证令牌并删除(一次性使用)"""
if token in session:
del session[token]
return True
return False
@app.route('/api/get-token', methods=['GET'])
def get_token():
"""获取提交令牌"""
token = TokenManager.generate_token()
session[token] = True
return jsonify({'token': token})
@app.route('/api/submit-with-token', methods=['POST'])
def submit_with_token():
"""带令牌提交"""
token = request.headers.get('X-Request-Token')
if not token:
return jsonify({'code': 400, 'message': '缺少请求令牌'}), 400
if not TokenManager.validate_token(token):
return jsonify({
'code': 429,
'message': '令牌已过期或已使用,请重新获取'
}), 429
# 业务处理
return jsonify({'code': 200, 'message': '提交成功'})
基于幂等性的方案(适合支付等场景)
import hashlib
import json
from datetime import datetime
from flask import Flask, request, jsonify
from sqlalchemy import create_engine, Column, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
app = Flask(__name__)
Base = declarative_base()
class IdempotentRecord(Base):
"""幂等性记录表"""
__tablename__ = 'idempotent_records'
idempotent_key = Column(String(64), primary_key=True)
request_body = Column(String(2000))
response_body = Column(String(2000))
create_time = Column(DateTime, default=datetime.now)
class IdempotentManager:
"""幂等性管理器"""
def __init__(self, db_url):
engine = create_engine(db_url)
Base.metadata.create_all(engine)
self.Session = sessionmaker(bind=engine)
def generate_key(self, request_data):
"""生成幂等键"""
# 基于请求内容生成唯一键
data_str = json.dumps(request_data, sort_keys=True)
return hashlib.md5(data_str.encode()).hexdigest()
def process_with_idempotent(self, key, process_func):
"""
幂等性处理
返回: (是否新处理, 处理结果)
"""
session = self.Session()
try:
# 检查是否已处理
record = session.query(IdempotentRecord).filter_by(
idempotent_key=key
).first()
if record:
# 已处理过,返回之前的结果
return False, json.loads(record.response_body)
# 执行处理函数
result = process_func()
# 保存处理记录
record = IdempotentRecord(
idempotent_key=key,
request_body=json.dumps(request.json),
response_body=json.dumps(result)
)
session.add(record)
session.commit()
return True, result
except Exception as e:
session.rollback()
raise e
finally:
session.close()
# 初始化幂等性管理器
idempotent_mgr = IdempotentManager('sqlite:///idempotent.db')
@app.route('/api/payment', methods=['POST'])
def payment():
"""支付接口(幂等性处理)"""
# 生成幂等键
idempotent_key = idempotent_mgr.generate_key(request.json)
def process_payment():
# 实际的支付处理逻辑
return {'order_id': '123456', 'amount': 100.00, 'status': 'success'}
is_new, result = idempotent_mgr.process_with_idempotent(
idempotent_key, process_payment
)
if not is_new:
result['message'] = '此次请求之前已处理'
return jsonify(result)
基于前端防抖/节流+后端验证的组合方案
# 后端验证中间件
from flask import Flask, request, jsonify, g
from functools import wraps
import time
app = Flask(__name__)
class RequestThrottle:
"""请求节流器"""
def __init__(self):
# 存储每个IP的请求时间戳
self.request_records = {}
def check_throttle(self, ip, interval=2):
"""
检查请求是否过快
:param interval: 请求间隔(秒)
"""
current_time = time.time()
last_request_time = self.request_records.get(ip)
if last_request_time and (current_time - last_request_time) < interval:
return False
self.request_records[ip] = current_time
return True
throttle = RequestThrottle()
@app.before_request
def check_request_rate():
"""请求频率检查"""
if request.method == 'POST':
ip = request.remote_addr
if not throttle.check_throttle(ip, interval=0.5): # 500ms内不允许重复提交
return jsonify({
'code': 429,
'message': '请求过于频繁,请稍后再试'
}), 429
# 前端防抖示例(JavaScript)
frontend_code = """
// 防抖函数
function debounce(fn, delay = 1000) {
let timer = null;
return function(...args) {
if (timer) {
clearTimeout(timer);
}
timer = setTimeout(() => {
fn.apply(this, args);
timer = null;
}, delay);
};
}
// 节流函数
function throttle(fn, delay = 1000) {
let canRun = true;
return function(...args) {
if (!canRun) return;
canRun = false;
setTimeout(() => {
fn.apply(this, args);
canRun = true;
}, delay);
};
}
// 使用示例
const submitOrder = debounce(async (data) => {
try {
const response = await fetch('/api/submit', {
method: 'POST',
headers: {
'Content-Type': 'application/json'
},
body: JSON.stringify(data)
});
const result = await response.json();
console.log(result);
} catch (error) {
console.error('提交失败:', error);
}
}, 1000);
"""
使用分布式ID生成器
import time
import socket
import struct
from functools import wraps
from flask import Flask, request, jsonify
app = Flask(__name__)
class DistributedIdGenerator:
"""分布式ID生成器(雪花算法简化版)"""
def __init__(self, worker_id=1, datacenter_id=1):
self.worker_id = worker_id
self.datacenter_id = datacenter_id
self.sequence = 0
self.last_timestamp = -1
def generate_id(self):
"""生成唯一ID"""
timestamp = int(time.time() * 1000)
if timestamp == self.last_timestamp:
self.sequence = (self.sequence + 1) & 4095
if self.sequence == 0:
timestamp = self.wait_next_millis()
else:
self.sequence = 0
self.last_timestamp = timestamp
# 组合ID (时间戳 + 数据中心 + 工作节点 + 序列号)
id_str = f"{timestamp}{self.datacenter_id:02d}{self.worker_id:02d}{self.sequence:04d}"
return int(id_str)
def wait_next_millis(self):
"""等待下一毫秒"""
timestamp = int(time.time() * 1000)
while timestamp <= self.last_timestamp:
timestamp = int(time.time() * 1000)
return timestamp
id_generator = DistributedIdGenerator()
@app.route('/api/order', methods=['POST'])
def create_order():
"""创建订单接口"""
# 生成唯一请求ID
request_id = id_generator.generate_id()
# 使用请求ID进行去重处理
# 可以存储到数据库或Redis中
return jsonify({
'code': 200,
'request_id': request_id,
'message': '订单创建成功'
})
最佳实践建议
-
选择合适方案:
- 高并发场景:推荐Redis分布式锁
- 重要交易场景:使用幂等性方案
- 简单场景:Token机制或防抖处理
-
多层级防护:
- 前端:按钮禁用、防抖/节流
- 网络层:Nginx限流
- 应用层:分布式锁/令牌机制
- 数据层:数据库唯一索引
-
性能优化:
- Redis锁过期时间要合理设置
- 使用Lua脚本保证原子性
- 定期清理过期数据
-
异常处理:
- 网络超时重试机制
- 锁超时自动释放
- 完善的错误提示
根据您的具体业务场景选择合适的方案,通常建议组合使用多种方法以达到最佳效果。