Python案例怎么记录接口访问日志?

wen python案例 20

本文目录导读:

Python案例怎么记录接口访问日志?

  1. 使用装饰器(适用于 Flask/FastAPI 等框架)
  2. 使用 FastAPI 中间件
  3. 使用 Django 中间件
  4. 高级日志配置(结构化日志)
  5. 日志文件管理
  6. 异步日志处理(高性能)
  7. 最佳实践建议

在 Python 中记录接口访问日志,通常使用 装饰器中间件 的方式来实现,以下是几种常见的实现方案:

使用装饰器(适用于 Flask/FastAPI 等框架)

Flask 示例

from flask import Flask, request, jsonify
import logging
from functools import wraps
import json
from datetime import datetime
app = Flask(__name__)
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('api_access.log', encoding='utf-8'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
def log_access(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        start_time = datetime.now()
        # 记录请求信息
        request_info = {
            'method': request.method,
            'url': request.url,
            'ip': request.remote_addr,
            'user_agent': request.headers.get('User-Agent'),
            'args': dict(request.args),
            'data': request.get_data(as_text=True) if request.data else None
        }
        logger.info(f"Request: {json.dumps(request_info, ensure_ascii=False)}")
        try:
            response = func(*args, **kwargs)
            end_time = datetime.now()
            duration = (end_time - start_time).total_seconds()
            # 记录响应信息
            response_info = {
                'status_code': response.status_code if hasattr(response, 'status_code') else 200,
                'duration': f"{duration:.3f}s"
            }
            logger.info(f"Response: {json.dumps(response_info)}")
            return response
        except Exception as e:
            logger.error(f"Error: {str(e)}")
            raise
    return wrapper
@app.route('/api/users')
@log_access
def get_users():
    return jsonify({'users': ['Alice', 'Bob']})
if __name__ == '__main__':
    app.run(debug=True)

使用 FastAPI 中间件

from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import logging
import time
import json
app = FastAPI()
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('api_access.log', encoding='utf-8'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
@app.middleware("http")
async def log_request_middleware(request: Request, call_next):
    start_time = time.time()
    # 获取请求体
    body = await request.body()
    # 记录请求信息
    request_info = {
        'method': request.method,
        'url': str(request.url),
        'ip': request.client.host,
        'user_agent': request.headers.get('user-agent'),
        'path_params': request.path_params,
        'query_params': dict(request.query_params),
        'body': body.decode('utf-8') if body else None
    }
    logger.info(f"Request: {json.dumps(request_info, ensure_ascii=False)}")
    try:
        response = await call_next(request)
        # 记录响应信息
        process_time = time.time() - start_time
        response_info = {
            'status_code': response.status_code,
            'duration': f"{process_time:.3f}s",
            'headers': dict(response.headers)
        }
        logger.info(f"Response: {json.dumps(response_info)}")
        return response
    except Exception as e:
        logger.error(f"Error processing request: {str(e)}")
        return JSONResponse(
            status_code=500,
            content={"detail": "Internal Server Error"}
        )
@app.get('/api/users')
async def get_users():
    return {'users': ['Alice', 'Bob']}

使用 Django 中间件

# middleware.py
import logging
import time
import json
from datetime import datetime
logger = logging.getLogger(__name__)
class APIAccessLogMiddleware:
    def __init__(self, get_response):
        self.get_response = get_response
    def __call__(self, request):
        start_time = time.time()
        # 记录请求信息
        request_info = {
            'method': request.method,
            'path': request.path,
            'ip': self.get_client_ip(request),
            'user_agent': request.META.get('HTTP_USER_AGENT'),
            'query_params': dict(request.GET),
            'body': request.body.decode('utf-8') if request.body else None
        }
        logger.info(f"Request: {json.dumps(request_info, ensure_ascii=False)}")
        response = self.get_response(request)
        # 记录响应信息
        duration = time.time() - start_time
        response_info = {
            'status_code': response.status_code,
            'duration': f"{duration:.3f}s",
            'content_type': response.get('Content-Type')
        }
        logger.info(f"Response: {json.dumps(response_info)}")
        return response
    def get_client_ip(self, request):
        x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
        if x_forwarded_for:
            ip = x_forwarded_for.split(',')[0]
        else:
            ip = request.META.get('REMOTE_ADDR')
        return ip

高级日志配置(结构化日志)

import structlog
from structlog.processors import JSONRenderer
import sys
# 配置 structlog
structlog.configure(
    processors=[
        structlog.stdlib.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.dev.ConsoleRenderer() if sys.stderr.isatty() else JSONRenderer()
    ],
    context_class=dict,
    logger_factory=structlog.stdlib.LoggerFactory(),
    wrapper_class=structlog.stdlib.BoundLogger,
    cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
# 使用示例
@app.middleware("http")
async def log_middleware(request: Request, call_next):
    start = time.time()
    log = logger.bind(
        method=request.method,
        url=str(request.url),
        ip=request.client.host
    )
    response = await call_next(request)
    log.info(
        "request_completed",
        status_code=response.status_code,
        duration=time.time() - start
    )
    return response

日志文件管理

import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
# 按文件大小轮转
handler = RotatingFileHandler(
    'api_access.log',
    maxBytes=1024*1024*10,  # 10MB
    backupCount=5,
    encoding='utf-8'
)
# 按时间轮转
time_handler = TimedRotatingFileHandler(
    'api_access.log',
    when='midnight',  # 每天午夜轮转
    interval=1,
    backupCount=30,  # 保留30天
    encoding='utf-8'
)
formatter = logging.Formatter(
    '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)

异步日志处理(高性能)

import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging
class AsyncLogger:
    def __init__(self, log_file='api_access.log'):
        self.executor = ThreadPoolExecutor(max_workers=2)
        self.logger = logging.getLogger(__name__)
        self.setup_logger(log_file)
    def setup_logger(self, log_file):
        handler = logging.FileHandler(log_file, encoding='utf-8')
        formatter = logging.Formatter(
            '%(asctime)s - %(levelname)s - %(message)s'
        )
        handler.setFormatter(formatter)
        self.logger.addHandler(handler)
        self.logger.setLevel(logging.INFO)
    async def log_async(self, level, message):
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(
            self.executor,
            getattr(self.logger, level),
            message
        )
    async def info(self, message):
        await self.log_async('info', message)
    async def error(self, message):
        await self.log_async('error', message)
# 使用示例
async_logger = AsyncLogger()
@app.middleware("http")
async def log_middleware(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    log_data = {
        'method': request.method,
        'url': str(request.url),
        'status': response.status_code,
        'duration': time.time() - start
    }
    await async_logger.info(json.dumps(log_data, ensure_ascii=False))
    return response

最佳实践建议

日志记录内容建议

# 完整的日志记录模板
def create_log_entry(request, response, start_time, error=None):
    return {
        'timestamp': datetime.now().isoformat(),
        'request_id': getattr(request, 'request_id', None),  # 请求追踪ID
        'user_id': request.user.id if hasattr(request, 'user') else None,
        'method': request.method,
        'path': request.path,
        'query_params': dict(request.GET),
        'ip': request.client.host if hasattr(request, 'client') else None,
        'user_agent': request.headers.get('user-agent'),
        'status_code': response.status_code if response else 500,
        'duration': time.time() - start_time,
        'error': str(error) if error else None,
        'response_size': len(response.body) if response else 0
    }

性能考虑

  1. 异步记录:使用异步方式写入日志文件
  2. 批量写入:将多条日志缓存后批量写入
  3. 使用专业日志系统:ELK Stack、Graylog 等
  4. 合理设置日志级别:生产环境只记录 WARNING 及以上级别

选择哪种方案取决于你的具体需求:

  • 简单项目:使用装饰器即可
  • 大型项目:推荐使用中间件 + 结构化日志
  • 高并发场景:使用异步日志处理

记得不要在日志中记录敏感信息(密码、令牌等),以防止安全漏洞。

抱歉,评论功能暂时关闭!