Python案例如何实现接口幂等性?

wen python案例 15

Python接口幂等性实现方案

什么是接口幂等性

幂等性是指多次执行同一操作执行一次产生的结果相同,常见场景:

Python案例如何实现接口幂等性?

  • 支付系统(防止重复扣款)
  • 订单系统(防止重复下单)
  • 消息队列(消息重复消费)

基于Token的方案

import uuid
import redis
from flask import Flask, request, jsonify, make_response
app = Flask(__name__)
redis_client = redis.Redis(host='localhost', port=6379, db=0)
# 生成幂等性Token
@app.route('/api/idempotent/token', methods=['GET'])
def get_idempotent_token():
    token = str(uuid.uuid4())
    # 存储到Redis,设置过期时间(防止内存泄漏)
    redis_client.setex(f"idempotent:{token}", 3600, "active")
    return jsonify({"token": token})
# 带幂等性校验的接口
@app.route('/api/order/create', methods=['POST'])
def create_order():
    token = request.headers.get('Idempotent-Token')
    if not token:
        return jsonify({"error": "缺少幂等性Token"}), 400
    if not redis_client.get(f"idempotent:{token}"):
        return jsonify({"error": "Token已过期或已使用"}), 409
    # 删除Token(保证幂等性)
    if not redis_client.delete(f"idempotent:{token}"):
        # 并发情况下可能已被删除,说明请求正在处理中
        return jsonify({"message": "请求正在处理中"}), 200
    # 执行业务逻辑
    order_data = request.json
    # ... 创建订单逻辑
    return jsonify({"message": "订单创建成功", "order_id": "123456"}), 200

基于数据库唯一键的方案

from sqlalchemy import create_engine, Column, String, Integer, UniqueConstraint
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
Base = declarative_base()
class Order(Base):
    __tablename__ = 'orders'
    id = Column(Integer, primary_key=True)
    order_id = Column(String(64), unique=True)  # 唯一约束
    user_id = Column(Integer)
    amount = Column(Integer)
    status = Column(String(32))
# 使用数据库唯一键实现幂等性
@app.route('/api/payment/pay', methods=['POST'])
def payment_pay():
    payment_data = request.json
    payment_id = payment_data.get('payment_id')  # 业务幂等键
    try:
        # 使用INSERT...ON DUPLICATE KEY UPDATE(MySQL)
        # 或使用SELECT和INSERT组合判断
        # 检查是否已处理
        existing = session.query(Order).filter_by(
            order_id=payment_id
        ).first()
        if existing:
            return jsonify({
                "message": "订单已处理",
                "order_id": existing.order_id
            }), 200
        # 创建新订单
        new_order = Order(order_id=payment_id, user_id=...)
        session.add(new_order)
        session.commit()
        return jsonify({"order_id": payment_id}), 201
    except Exception as e:
        session.rollback()
        return jsonify({"error": "处理失败"}), 500

基于Redis分布式锁的方案

import time
import hashlib
from contextlib import contextmanager
class IdempotentManager:
    def __init__(self, redis_client):
        self.redis = redis_client
    def get_idempotent_key(self, request_data):
        """生成幂等性检查键"""
        # 对请求参数进行哈希
        content = f"{request_data.get('user_id')}:{request_data.get('amount')}"
        return hashlib.md5(content.encode()).hexdigest()
    @contextmanager
    def lock(self, key, timeout=30):
        """分布式锁实现"""
        lock_key = f"idempotent:lock:{key}"
        while True:
            if self.redis.setnx(lock_key, time.time()):
                # 获取锁成功
                self.redis.expire(lock_key, timeout)
                try:
                    yield
                finally:
                    self.redis.delete(lock_key)
                break
            else:
                # 锁已被占用,等待
                time.sleep(0.1)
                # 检查锁是否过期
                if time.time() - float(self.redis.get(lock_key) or 0) > timeout:
                    self.redis.delete(lock_key)
    def check_and_process(self, request_data, process_func):
        """幂等性处理"""
        idempotent_key = self.get_idempotent_key(request_data)
        process_key = f"idempotent:process:{idempotent_key}"
        with self.lock(idempotent_key):
            # 检查是否已处理
            if self.redis.get(process_key):
                return {'status': 'processed', 'message': '请求已处理'}
            # 执行业务逻辑
            result = process_func(request_data)
            # 标记已处理
            self.redis.setex(process_key, 3600, 'done')
            return result
# 使用示例
@app.route('/api/transfer', methods=['POST'])
def transfer():
    request_data = request.json
    manager = IdempotentManager(redis_client)
    def process(data):
        # 实际的转账逻辑
        from_account = data['from_account']
        to_account = data['to_account']
        amount = data['amount']
        # 执行转账...
        return {'status': 'success', 'message': '转账成功'}
    result = manager.check_and_process(request_data, process)
    return jsonify(result)

完整的生产级实现

import functools
from flask import request, g
from datetime import datetime
def idempotent_decorator(timeout=3600):
    """幂等性装饰器"""
    def decorator(f):
        @functools.wraps(f)
        def wrapper(*args, **kwargs):
            # 获取幂等性标识
            idempotent_key = request.headers.get('X-Idempotent-Key')
            if not idempotent_key:
                return jsonify({'error': '缺少幂等性标识'}), 400
            # 检查是否已处理
            cache_key = f"idempotent:{f.__name__}:{idempotent_key}"
            cached_response = redis_client.get(cache_key)
            if cached_response:
                return jsonify({
                    'code': 200,
                    'message': '重复请求',
                    'data': cached_response
                }), 200
            # 标记处理中
            processing_flag = f"{cache_key}:processing"
            if not redis_client.setnx(processing_flag, time.time()):
                return jsonify({'code': 429, 'message': '请求正在处理中'}), 429
            redis_client.expire(processing_flag, 30)  # 防止死锁
            try:
                # 执行原函数
                result = f(*args, **kwargs)
                # 缓存结果
                redis_client.setex(cache_key, timeout, result)
                return result
            except Exception as e:
                # 发生异常,清理处理标记
                redis_client.delete(processing_flag)
                raise
            finally:
                redis_client.delete(processing_flag)
        return wrapper
    return decorator
# 使用装饰器实现幂等性
@app.route('/api/order/create', methods=['POST'])
@idempotent_decorator(timeout=3600)
def create_order_api():
    """创建订单接口"""
    data = request.json
    # 业务逻辑
    order = create_order_in_db(
        user_id=data['user_id'],
        amount=data['amount'],
        items=data['items']
    )
    return order.to_dict()

幂等性设计最佳实践

幂等性键生成策略

import hashlib
from flask import request
def generate_idempotent_key():
    """生成幂等性键"""
    # 方案1:基于请求内容和时间戳
    request_content = f"{request.method}:{request.path}:{request.get_data()}"
    return hashlib.sha256(request_content.encode()).hexdigest()
    # 方案2:客户端提供唯一标识
    # return request.headers.get('X-Idempotent-Key')
    # 方案3:业务键值(如订单号、交易流水号)
    # return request.json.get('business_id')

并发控制实现

import threading
class IdempotentConcurrencyController:
    """并发控制"""
    def __init__(self, redis_client):
        self.redis = redis_client
        self.local_lock = threading.Lock()
    def acquire(self, key):
        """获取幂等性锁"""
        lock_key = f"idempotent:lock:{key}"
        # 尝试获取Redis锁
        acquired = self.redis.setnx(lock_key, "locked")
        if acquired:
            self.redis.expire(lock_key, 10)
            return True
        # Redis锁失败,使用本地锁降级
        return self.local_lock.acquire(blocking=False)
    def release(self, key):
        """释放锁"""
        lock_key = f"idempotent:lock:{key}"
        self.redis.delete(lock_key)
        self.local_lock.release()

测试代码

import pytest
import requests
class TestIdempotent:
    def test_duplicate_request(self):
        """测试重复请求"""
        # 第一次请求
        token = requests.get('/api/idempotent/token').json()['token']
        headers = {'Idempotent-Token': token}
        data = {'user_id': 1, 'amount': 100}
        # 第一次请求
        response1 = requests.post('/api/order/create', 
                                 json=data, headers=headers)
        assert response1.status_code == 201
        # 重复请求(使用相同Token)
        response2 = requests.post('/api/order/create', 
                                 json=data, headers=headers)
        assert response2.status_code == 200
        assert response2.json()['message'] == '请求正在处理中'

实现接口幂等性的关键点:

  1. 唯一标识:为每个请求生成唯一标识(Token、业务键、哈希值)
  2. 存储机制:使用Redis、数据库等存储请求处理状态
  3. 并发控制:通过分布式锁、数据库锁等确保原子性
  4. 过期策略:避免存储无限增长,设置合适的过期时间
  5. 异常处理:处理网络故障、服务重启等异常情况

抱歉,评论功能暂时关闭!