本文目录导读:

在 Python 中记录接口访问日志,通常使用 装饰器 或 中间件 的方式来实现,以下是几种常见的实现方案:
使用装饰器(适用于 Flask/FastAPI 等框架)
Flask 示例
from flask import Flask, request, jsonify
import logging
from functools import wraps
import json
from datetime import datetime
app = Flask(__name__)
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('api_access.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def log_access(func):
@wraps(func)
def wrapper(*args, **kwargs):
start_time = datetime.now()
# 记录请求信息
request_info = {
'method': request.method,
'url': request.url,
'ip': request.remote_addr,
'user_agent': request.headers.get('User-Agent'),
'args': dict(request.args),
'data': request.get_data(as_text=True) if request.data else None
}
logger.info(f"Request: {json.dumps(request_info, ensure_ascii=False)}")
try:
response = func(*args, **kwargs)
end_time = datetime.now()
duration = (end_time - start_time).total_seconds()
# 记录响应信息
response_info = {
'status_code': response.status_code if hasattr(response, 'status_code') else 200,
'duration': f"{duration:.3f}s"
}
logger.info(f"Response: {json.dumps(response_info)}")
return response
except Exception as e:
logger.error(f"Error: {str(e)}")
raise
return wrapper
@app.route('/api/users')
@log_access
def get_users():
return jsonify({'users': ['Alice', 'Bob']})
if __name__ == '__main__':
app.run(debug=True)
使用 FastAPI 中间件
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse
import logging
import time
import json
app = FastAPI()
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('api_access.log', encoding='utf-8'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
@app.middleware("http")
async def log_request_middleware(request: Request, call_next):
start_time = time.time()
# 获取请求体
body = await request.body()
# 记录请求信息
request_info = {
'method': request.method,
'url': str(request.url),
'ip': request.client.host,
'user_agent': request.headers.get('user-agent'),
'path_params': request.path_params,
'query_params': dict(request.query_params),
'body': body.decode('utf-8') if body else None
}
logger.info(f"Request: {json.dumps(request_info, ensure_ascii=False)}")
try:
response = await call_next(request)
# 记录响应信息
process_time = time.time() - start_time
response_info = {
'status_code': response.status_code,
'duration': f"{process_time:.3f}s",
'headers': dict(response.headers)
}
logger.info(f"Response: {json.dumps(response_info)}")
return response
except Exception as e:
logger.error(f"Error processing request: {str(e)}")
return JSONResponse(
status_code=500,
content={"detail": "Internal Server Error"}
)
@app.get('/api/users')
async def get_users():
return {'users': ['Alice', 'Bob']}
使用 Django 中间件
# middleware.py
import logging
import time
import json
from datetime import datetime
logger = logging.getLogger(__name__)
class APIAccessLogMiddleware:
def __init__(self, get_response):
self.get_response = get_response
def __call__(self, request):
start_time = time.time()
# 记录请求信息
request_info = {
'method': request.method,
'path': request.path,
'ip': self.get_client_ip(request),
'user_agent': request.META.get('HTTP_USER_AGENT'),
'query_params': dict(request.GET),
'body': request.body.decode('utf-8') if request.body else None
}
logger.info(f"Request: {json.dumps(request_info, ensure_ascii=False)}")
response = self.get_response(request)
# 记录响应信息
duration = time.time() - start_time
response_info = {
'status_code': response.status_code,
'duration': f"{duration:.3f}s",
'content_type': response.get('Content-Type')
}
logger.info(f"Response: {json.dumps(response_info)}")
return response
def get_client_ip(self, request):
x_forwarded_for = request.META.get('HTTP_X_FORWARDED_FOR')
if x_forwarded_for:
ip = x_forwarded_for.split(',')[0]
else:
ip = request.META.get('REMOTE_ADDR')
return ip
高级日志配置(结构化日志)
import structlog
from structlog.processors import JSONRenderer
import sys
# 配置 structlog
structlog.configure(
processors=[
structlog.stdlib.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.dev.ConsoleRenderer() if sys.stderr.isatty() else JSONRenderer()
],
context_class=dict,
logger_factory=structlog.stdlib.LoggerFactory(),
wrapper_class=structlog.stdlib.BoundLogger,
cache_logger_on_first_use=True,
)
logger = structlog.get_logger()
# 使用示例
@app.middleware("http")
async def log_middleware(request: Request, call_next):
start = time.time()
log = logger.bind(
method=request.method,
url=str(request.url),
ip=request.client.host
)
response = await call_next(request)
log.info(
"request_completed",
status_code=response.status_code,
duration=time.time() - start
)
return response
日志文件管理
import logging
from logging.handlers import RotatingFileHandler, TimedRotatingFileHandler
# 按文件大小轮转
handler = RotatingFileHandler(
'api_access.log',
maxBytes=1024*1024*10, # 10MB
backupCount=5,
encoding='utf-8'
)
# 按时间轮转
time_handler = TimedRotatingFileHandler(
'api_access.log',
when='midnight', # 每天午夜轮转
interval=1,
backupCount=30, # 保留30天
encoding='utf-8'
)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger = logging.getLogger(__name__)
logger.addHandler(handler)
logger.setLevel(logging.INFO)
异步日志处理(高性能)
import asyncio
from concurrent.futures import ThreadPoolExecutor
import logging
class AsyncLogger:
def __init__(self, log_file='api_access.log'):
self.executor = ThreadPoolExecutor(max_workers=2)
self.logger = logging.getLogger(__name__)
self.setup_logger(log_file)
def setup_logger(self, log_file):
handler = logging.FileHandler(log_file, encoding='utf-8')
formatter = logging.Formatter(
'%(asctime)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
async def log_async(self, level, message):
loop = asyncio.get_event_loop()
await loop.run_in_executor(
self.executor,
getattr(self.logger, level),
message
)
async def info(self, message):
await self.log_async('info', message)
async def error(self, message):
await self.log_async('error', message)
# 使用示例
async_logger = AsyncLogger()
@app.middleware("http")
async def log_middleware(request: Request, call_next):
start = time.time()
response = await call_next(request)
log_data = {
'method': request.method,
'url': str(request.url),
'status': response.status_code,
'duration': time.time() - start
}
await async_logger.info(json.dumps(log_data, ensure_ascii=False))
return response
最佳实践建议
日志记录内容建议
# 完整的日志记录模板
def create_log_entry(request, response, start_time, error=None):
return {
'timestamp': datetime.now().isoformat(),
'request_id': getattr(request, 'request_id', None), # 请求追踪ID
'user_id': request.user.id if hasattr(request, 'user') else None,
'method': request.method,
'path': request.path,
'query_params': dict(request.GET),
'ip': request.client.host if hasattr(request, 'client') else None,
'user_agent': request.headers.get('user-agent'),
'status_code': response.status_code if response else 500,
'duration': time.time() - start_time,
'error': str(error) if error else None,
'response_size': len(response.body) if response else 0
}
性能考虑
- 异步记录:使用异步方式写入日志文件
- 批量写入:将多条日志缓存后批量写入
- 使用专业日志系统:ELK Stack、Graylog 等
- 合理设置日志级别:生产环境只记录 WARNING 及以上级别
选择哪种方案取决于你的具体需求:
- 简单项目:使用装饰器即可
- 大型项目:推荐使用中间件 + 结构化日志
- 高并发场景:使用异步日志处理
记得不要在日志中记录敏感信息(密码、令牌等),以防止安全漏洞。