Python案例怎么处理接口超时?

wen python案例 34

本文目录导读:

Python案例怎么处理接口超时?

  1. 客户端设置超时(最常用)
  2. 重试机制(处理偶发超时)
  3. 全局超时设置(适用于大批量请求)
  4. 异步批量请求的超时处理
  5. 服务端超时设置(如果控制服务端)
  6. 最佳实践总结
  7. 关键原则

处理Python接口超时问题,通常从客户端设置服务端处理两个维度入手,以下是具体的解决方案和最佳实践。

客户端设置超时(最常用)

使用 requests

requests 库是Python中最常用的HTTP客户端,设置超时非常简单:

import requests
from requests.exceptions import Timeout, ConnectionError, RequestException
try:
    # 设置连接超时和读取超时
    response = requests.get(
        'https://api.example.com/data',
        timeout=(3, 10)  # (连接超时, 读取超时)
    )
    # 或者简单设置
    # response = requests.get('https://api.example.com/data', timeout=5)
    print(response.json())
except Timeout:
    print("请求超时,请稍后重试")
except ConnectionError:
    print("连接失败,请检查网络")
except RequestException as e:
    print(f"请求异常: {e}")

重要说明

  • timeout=(connect_timeout, read_timeout):连接超时和读取超时分开设置
  • timeout=5:连接和读取都设置为5秒
  • 不设置timeout会导致程序永远等待

使用 aiohttp (异步请求)

import aiohttp
import asyncio
async def fetch_data():
    timeout = aiohttp.ClientTimeout(total=10, connect=5)
    try:
        async with aiohttp.ClientSession(timeout=timeout) as session:
            async with session.get('https://api.example.com/data') as response:
                data = await response.json()
                return data
    except asyncio.TimeoutError:
        print("异步请求超时")
    except aiohttp.ClientError as e:
        print(f"请求错误: {e}")
# 运行
result = asyncio.run(fetch_data())

重试机制(处理偶发超时)

使用 tenacity

from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
from requests.exceptions import Timeout
import requests
@retry(
    stop=stop_after_attempt(3),  # 最多重试3次
    wait=wait_exponential(multiplier=1, min=1, max=10),  # 指数退避等待
    retry=retry_if_exception_type(Timeout)  # 仅对超时异常重试
)
def fetch_with_retry():
    response = requests.get('https://api.example.com/data', timeout=5)
    return response.json()
try:
    data = fetch_with_retry()
    print(data)
except Exception as e:
    print(f"所有重试都失败了: {e}")

手动实现重试

import time
from requests.exceptions import Timeout
def fetch_with_manual_retry(max_retries=3, base_delay=1):
    for attempt in range(max_retries):
        try:
            response = requests.get('https://api.example.com/data', timeout=5)
            return response.json()
        except Timeout:
            if attempt == max_retries - 1:
                raise  # 最后一次失败,向上抛出异常
            # 指数退避
            delay = base_delay * (2 ** attempt)
            print(f"第{attempt+1}次超时,等待{delay}秒后重试...")
            time.sleep(delay)

全局超时设置(适用于大批量请求)

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
def create_session_with_retry():
    session = requests.Session()
    # 配置重试策略
    retry_strategy = Retry(
        total=3,  # 总重试次数
        backoff_factor=1,  # 退避因子
        status_forcelist=[500, 502, 503, 504],  # 需要重试的状态码
        allowed_methods=["GET", "POST"]
    )
    # 创建适配器
    adapter = HTTPAdapter(
        max_retries=retry_strategy,
        pool_connections=10,
        pool_maxsize=20
    )
    session.mount('http://', adapter)
    session.mount('https://', adapter)
    return session
# 使用会话
session = create_session_with_retry()
try:
    response = session.get('https://api.example.com/data', timeout=10)
    print(response.json())
finally:
    session.close()

异步批量请求的超时处理

import asyncio
import aiohttp
async def fetch_with_timeout(session, url, timeout=10):
    try:
        async with asyncio.timeout(timeout):
            async with session.get(url) as response:
                return await response.json()
    except asyncio.TimeoutError:
        return {"error": f"请求超时: {url}"}
async def batch_fetch(urls):
    connector = aiohttp.TCPConnector(limit=10)  # 限制并发数
    timeout = aiohttp.ClientTimeout(total=30)  # 全局超时
    async with aiohttp.ClientSession(connector=connector, timeout=timeout) as session:
        tasks = [fetch_with_timeout(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        return results
# 使用
urls = [
    'https://api1.example.com/data',
    'https://api2.example.com/data',
    'https://api3.example.com/data'
]
results = asyncio.run(batch_fetch(urls))

服务端超时设置(如果控制服务端)

Flask 示例

from flask import Flask, jsonify
import time
app = Flask(__name__)
@app.route('/slow-api')
def slow_api():
    # 模拟耗时操作
    timeout = 30  # 设置超时时间
    start_time = time.time()
    # 执行耗时操作
    result = expensive_operation()
    elapsed = time.time() - start_time
    if elapsed > timeout:
        return jsonify({"error": "请求超时"}), 504
    return jsonify({"result": result})
# 使用超时装饰器
from functools import wraps
import signal
def timeout_handler(seconds=30):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            # 设置信号处理器
            signal.signal(signal.SIGALRM, lambda signum, frame: 
                (_ for _ in ()).throw(TimeoutError("请求处理超时")))
            signal.alarm(seconds)
            try:
                result = func(*args, **kwargs)
                signal.alarm(0)  # 取消闹钟
                return result
            except TimeoutError:
                return jsonify({"error": "请求超时"}), 504
        return wrapper
    return decorator
@app.route('/timeout-api')
@timeout_handler(seconds=10)
def timeout_api():
    # 可能超时的操作
    time.sleep(15)  # 模拟耗时操作
    return jsonify({"status": "success"})

最佳实践总结

import logging
from dataclasses import dataclass
from typing import Optional, Any
import requests
from tenacity import retry, stop_after_attempt, wait_exponential
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
@dataclass
class APIClientConfig:
    """API客户端配置"""
    base_url: str
    timeout: tuple = (5, 30)  # (连接超时, 读取超时)
    max_retries: int = 3
    retry_backoff: float = 1.0
class RobustAPIClient:
    """健壮的API客户端"""
    def __init__(self, config: APIClientConfig):
        self.config = config
        self.session = self._create_session()
    def _create_session(self):
        session = requests.Session()
        session.headers.update({
            'User-Agent': 'RobustAPIClient/1.0',
            'Accept': 'application/json'
        })
        return session
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10),
        retry_error_callback=lambda retry_state: None
    )
    def get(self, endpoint: str, **kwargs) -> Optional[Any]:
        """带超时和重试的GET请求"""
        url = f"{self.config.base_url}/{endpoint.lstrip('/')}"
        try:
            response = self.session.get(
                url,
                timeout=self.config.timeout,
                **kwargs
            )
            response.raise_for_status()
            return response.json()
        except requests.exceptions.Timeout:
            logger.warning(f"请求超时: {url}")
            raise  # 让tenacity重试
        except requests.exceptions.RequestException as e:
            logger.error(f"请求失败: {url}, 错误: {e}")
            if hasattr(e.response, 'status_code'):
                if e.response.status_code in [500, 502, 503, 504]:
                    raise  # 服务器错误,重试
            return None
    def close(self):
        self.session.close()
# 使用示例
config = APIClientConfig(
    base_url='https://api.example.com',
    timeout=(3, 10),
    max_retries=3
)
client = RobustAPIClient(config)
try:
    data = client.get('/data')
    if data:
        print("成功获取数据:", data)
    else:
        print("获取数据失败")
finally:
    client.close()

关键原则

  1. 始终设置超时:不设置timeout是常见的bug来源
  2. 区分连接超时和读取超时:连接超时通常设置较短(1-5秒),读取超时根据业务设置(10-30秒)
  3. 实现指数退避重试:避免同时大量重试导致服务器雪崩
  4. 记录超时日志:便于排查问题
  5. 设置合理的并发限制:防止资源耗尽
  6. 对于关键业务,考虑使用断路器模式:当服务持续超时时,暂时停止请求

这样可以有效处理绝大多数接口超时问题,提高系统的稳定性和可用性。

抱歉,评论功能暂时关闭!