Python案例怎么防止接口重复提交?

wen python案例 19

本文目录导读:

Python案例怎么防止接口重复提交?

  1. 基于Redis的分布式锁方案(最常用)
  2. 基于Token令牌机制
  3. 基于幂等性的方案(适合支付等场景)
  4. 基于前端防抖/节流+后端验证的组合方案
  5. 使用分布式ID生成器
  6. 最佳实践建议

防止接口重复提交通常有多种实现方案,我为您介绍几种常见且实用的Python实现方法。

基于Redis的分布式锁方案(最常用)

import redis
import time
import uuid
from functools import wraps
from flask import Flask, request, jsonify
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
def prevent_duplicate_submit(timeout=3):
    """
    防止重复提交装饰器
    :param timeout: 锁定时间(秒)
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 生成唯一标识(基于用户ID和接口路径)
            user_id = request.headers.get('X-User-Id', 'anonymous')
            api_path = request.path
            lock_key = f"lock:{user_id}:{api_path}"
            request_id = str(uuid.uuid4())
            # 尝试获取锁
            if redis_client.setnx(lock_key, request_id):
                # 设置过期时间
                redis_client.expire(lock_key, timeout)
                try:
                    return func(*args, **kwargs)
                finally:
                    # 使用Lua脚本保证原子性删除
                    script = """
                        if redis.call('get', KEYS[1]) == ARGV[1] then
                            return redis.call('del', KEYS[1])
                        else
                            return 0
                        end
                    """
                    redis_client.eval(script, 1, lock_key, request_id)
            else:
                return jsonify({
                    'code': 429,
                    'message': '请求正在处理中,请勿重复提交',
                    'data': None
                }), 429
        return wrapper
    return decorator
@app.route('/api/submit', methods=['POST'])
@prevent_duplicate_submit(timeout=5)
def submit_order():
    # 业务逻辑
    return jsonify({'code': 200, 'message': 'success'})

基于Token令牌机制

import hashlib
import time
from flask import Flask, request, jsonify, session
app = Flask(__name__)
app.secret_key = 'your-secret-key'
class TokenManager:
    """Token管理器"""
    @staticmethod
    def generate_token():
        """生成请求令牌"""
        # 生成基于时间的唯一令牌
        timestamp = str(int(time.time() * 1000))
        random_str = str(uuid.uuid4())
        token_str = f"{timestamp}:{random_str}"
        return hashlib.md5(token_str.encode()).hexdigest()
    @staticmethod
    def validate_token(token):
        """验证令牌并删除(一次性使用)"""
        if token in session:
            del session[token]
            return True
        return False
@app.route('/api/get-token', methods=['GET'])
def get_token():
    """获取提交令牌"""
    token = TokenManager.generate_token()
    session[token] = True
    return jsonify({'token': token})
@app.route('/api/submit-with-token', methods=['POST'])
def submit_with_token():
    """带令牌提交"""
    token = request.headers.get('X-Request-Token')
    if not token:
        return jsonify({'code': 400, 'message': '缺少请求令牌'}), 400
    if not TokenManager.validate_token(token):
        return jsonify({
            'code': 429, 
            'message': '令牌已过期或已使用,请重新获取'
        }), 429
    # 业务处理
    return jsonify({'code': 200, 'message': '提交成功'})

基于幂等性的方案(适合支付等场景)

import hashlib
import json
from datetime import datetime
from flask import Flask, request, jsonify
from sqlalchemy import create_engine, Column, String, DateTime
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
app = Flask(__name__)
Base = declarative_base()
class IdempotentRecord(Base):
    """幂等性记录表"""
    __tablename__ = 'idempotent_records'
    idempotent_key = Column(String(64), primary_key=True)
    request_body = Column(String(2000))
    response_body = Column(String(2000))
    create_time = Column(DateTime, default=datetime.now)
class IdempotentManager:
    """幂等性管理器"""
    def __init__(self, db_url):
        engine = create_engine(db_url)
        Base.metadata.create_all(engine)
        self.Session = sessionmaker(bind=engine)
    def generate_key(self, request_data):
        """生成幂等键"""
        # 基于请求内容生成唯一键
        data_str = json.dumps(request_data, sort_keys=True)
        return hashlib.md5(data_str.encode()).hexdigest()
    def process_with_idempotent(self, key, process_func):
        """
        幂等性处理
        返回: (是否新处理, 处理结果)
        """
        session = self.Session()
        try:
            # 检查是否已处理
            record = session.query(IdempotentRecord).filter_by(
                idempotent_key=key
            ).first()
            if record:
                # 已处理过,返回之前的结果
                return False, json.loads(record.response_body)
            # 执行处理函数
            result = process_func()
            # 保存处理记录
            record = IdempotentRecord(
                idempotent_key=key,
                request_body=json.dumps(request.json),
                response_body=json.dumps(result)
            )
            session.add(record)
            session.commit()
            return True, result
        except Exception as e:
            session.rollback()
            raise e
        finally:
            session.close()
# 初始化幂等性管理器
idempotent_mgr = IdempotentManager('sqlite:///idempotent.db')
@app.route('/api/payment', methods=['POST'])
def payment():
    """支付接口(幂等性处理)"""
    # 生成幂等键
    idempotent_key = idempotent_mgr.generate_key(request.json)
    def process_payment():
        # 实际的支付处理逻辑
        return {'order_id': '123456', 'amount': 100.00, 'status': 'success'}
    is_new, result = idempotent_mgr.process_with_idempotent(
        idempotent_key, process_payment
    )
    if not is_new:
        result['message'] = '此次请求之前已处理'
    return jsonify(result)

基于前端防抖/节流+后端验证的组合方案

# 后端验证中间件
from flask import Flask, request, jsonify, g
from functools import wraps
import time
app = Flask(__name__)
class RequestThrottle:
    """请求节流器"""
    def __init__(self):
        # 存储每个IP的请求时间戳
        self.request_records = {}
    def check_throttle(self, ip, interval=2):
        """
        检查请求是否过快
        :param interval: 请求间隔(秒)
        """
        current_time = time.time()
        last_request_time = self.request_records.get(ip)
        if last_request_time and (current_time - last_request_time) < interval:
            return False
        self.request_records[ip] = current_time
        return True
throttle = RequestThrottle()
@app.before_request
def check_request_rate():
    """请求频率检查"""
    if request.method == 'POST':
        ip = request.remote_addr
        if not throttle.check_throttle(ip, interval=0.5):  # 500ms内不允许重复提交
            return jsonify({
                'code': 429,
                'message': '请求过于频繁,请稍后再试'
            }), 429
# 前端防抖示例(JavaScript)
frontend_code = """
// 防抖函数
function debounce(fn, delay = 1000) {
    let timer = null;
    return function(...args) {
        if (timer) {
            clearTimeout(timer);
        }
        timer = setTimeout(() => {
            fn.apply(this, args);
            timer = null;
        }, delay);
    };
}
// 节流函数
function throttle(fn, delay = 1000) {
    let canRun = true;
    return function(...args) {
        if (!canRun) return;
        canRun = false;
        setTimeout(() => {
            fn.apply(this, args);
            canRun = true;
        }, delay);
    };
}
// 使用示例
const submitOrder = debounce(async (data) => {
    try {
        const response = await fetch('/api/submit', {
            method: 'POST',
            headers: {
                'Content-Type': 'application/json'
            },
            body: JSON.stringify(data)
        });
        const result = await response.json();
        console.log(result);
    } catch (error) {
        console.error('提交失败:', error);
    }
}, 1000);
"""

使用分布式ID生成器

import time
import socket
import struct
from functools import wraps
from flask import Flask, request, jsonify
app = Flask(__name__)
class DistributedIdGenerator:
    """分布式ID生成器(雪花算法简化版)"""
    def __init__(self, worker_id=1, datacenter_id=1):
        self.worker_id = worker_id
        self.datacenter_id = datacenter_id
        self.sequence = 0
        self.last_timestamp = -1
    def generate_id(self):
        """生成唯一ID"""
        timestamp = int(time.time() * 1000)
        if timestamp == self.last_timestamp:
            self.sequence = (self.sequence + 1) & 4095
            if self.sequence == 0:
                timestamp = self.wait_next_millis()
        else:
            self.sequence = 0
        self.last_timestamp = timestamp
        # 组合ID (时间戳 + 数据中心 + 工作节点 + 序列号)
        id_str = f"{timestamp}{self.datacenter_id:02d}{self.worker_id:02d}{self.sequence:04d}"
        return int(id_str)
    def wait_next_millis(self):
        """等待下一毫秒"""
        timestamp = int(time.time() * 1000)
        while timestamp <= self.last_timestamp:
            timestamp = int(time.time() * 1000)
        return timestamp
id_generator = DistributedIdGenerator()
@app.route('/api/order', methods=['POST'])
def create_order():
    """创建订单接口"""
    # 生成唯一请求ID
    request_id = id_generator.generate_id()
    # 使用请求ID进行去重处理
    # 可以存储到数据库或Redis中
    return jsonify({
        'code': 200,
        'request_id': request_id,
        'message': '订单创建成功'
    })

最佳实践建议

  1. 选择合适方案

    • 高并发场景:推荐Redis分布式锁
    • 重要交易场景:使用幂等性方案
    • 简单场景:Token机制或防抖处理
  2. 多层级防护

    • 前端:按钮禁用、防抖/节流
    • 网络层:Nginx限流
    • 应用层:分布式锁/令牌机制
    • 数据层:数据库唯一索引
  3. 性能优化

    • Redis锁过期时间要合理设置
    • 使用Lua脚本保证原子性
    • 定期清理过期数据
  4. 异常处理

    • 网络超时重试机制
    • 锁超时自动释放
    • 完善的错误提示

根据您的具体业务场景选择合适的方案,通常建议组合使用多种方法以达到最佳效果。

抱歉,评论功能暂时关闭!