Python实战案例与核心原理详解
目录导读
- 为什么需要文件分片上传?
- 文件分片上传的核心流程
- Python实现文件分片上传的完整案例
- 代码逐段详解:切片、上传、合并
- 常见问题与优化技巧(含问答)
- 安全性与并发控制建议
为什么需要文件分片上传?
在网络传输中,大文件(如视频、高清图片、日志压缩包)直接上传会面临三大问题:

- 网络波动导致失败:一旦连接中断,整个文件需重传
- 超时限制:服务器或浏览器可能对单次请求有大小限制(如Nginx默认1MB)
- 内存溢出:Python读取整个大文件会耗尽内存
分片上传的核心思想是:将大文件切分为多个小块(通常1-10MB),分别上传,最后在服务端合并,这能显著提升上传的稳定性和用户体验。
文件分片上传的核心流程
- 客户端切片:按固定大小(如5MB)读取文件片段,生成唯一标识(如文件MD5+分片序号)
- 分片上传:将每个分片通过HTTP POST请求发送到服务器
- 服务端暂存:服务器接收分片,写入临时目录,记录已上传的分片列表
- 合并确认:所有分片上传完成后,客户端发起合并请求,服务端将分片按序合并为原始文件
- 清理临时文件:合并成功后删除分片文件
Python实现文件分片上传的完整案例
以下案例使用requests库实现客户端上传,Flask实现服务端接收。代码可直接运行测试。
客户端代码(client.py)
import os
import hashlib
import requests
def split_and_upload(file_path, chunk_size=5*1024*1024, server_url="http://127.0.0.1:5000/upload"):
"""
将文件分片并逐片上传到服务器
:param file_path: 本地文件路径
:param chunk_size: 每片大小(默认5MB)
:param server_url: 服务器上传接口
"""
# 计算文件MD5作为全局标识
file_md5 = hashlib.md5(open(file_path, 'rb').read()).hexdigest()
total_size = os.path.getsize(file_path)
chunk_count = (total_size + chunk_size - 1) // chunk_size
with open(file_path, 'rb') as f:
for chunk_index in range(chunk_count):
start = chunk_index * chunk_size
f.seek(start)
chunk_data = f.read(chunk_size)
# 构造分片元数据
params = {
'file_md5': file_md5,
'chunk_index': chunk_index,
'total_chunks': chunk_count,
'filename': os.path.basename(file_path)
}
# 上传分片(注意:files参数以'chunk'字段发送二进制数据)
response = requests.post(
server_url,
files={'chunk': chunk_data},
data=params
)
if response.status_code == 200:
print(f"分片 {chunk_index+1}/{chunk_count} 上传成功")
else:
print(f"分片 {chunk_index+1} 上传失败: {response.text}")
return False
# 所有分片上传完毕,通知服务器合并
merge_url = server_url + "/merge"
merge_resp = requests.post(merge_url, data={'file_md5': file_md5})
if merge_resp.status_code == 200:
print("文件合并成功!")
return True
else:
print(f"合并失败: {merge_resp.text}")
return False
服务端代码(server.py)
from flask import Flask, request, jsonify
import os
app = Flask(__name__)
UPLOAD_DIR = "./temp_chunks"
os.makedirs(UPLOAD_DIR, exist_ok=True)
@app.route('/upload', methods=['POST'])
def upload_chunk():
file_md5 = request.form.get('file_md5')
chunk_index = int(request.form.get('chunk_index'))
total_chunks = int(request.form.get('total_chunks'))
filename = request.form.get('filename')
chunk_data = request.files.get('chunk')
if not all([file_md5, chunk_index, total_chunks, chunk_data]):
return jsonify({"error": "缺少参数"}), 400
# 按分片序号保存为临时文件(md5_0, md5_1,...)
chunk_dir = os.path.join(UPLOAD_DIR, file_md5)
os.makedirs(chunk_dir, exist_ok=True)
chunk_path = os.path.join(chunk_dir, str(chunk_index))
chunk_data.save(chunk_path)
# 检查是否所有分片已上传
uploaded_count = len(os.listdir(chunk_dir))
if uploaded_count == total_chunks:
return jsonify({"status": "chunk_received", "all_uploaded": True})
return jsonify({"status": "chunk_received", "all_uploaded": False})
@app.route('/upload/merge', methods=['POST'])
def merge_chunks():
file_md5 = request.form.get('file_md5')
chunk_dir = os.path.join(UPLOAD_DIR, file_md5)
if not os.path.exists(chunk_dir):
return jsonify({"error": "分片目录不存在"}), 404
# 按序号排序并合并
chunk_files = sorted(os.listdir(chunk_dir), key=lambda x: int(x))
original_filename = request.form.get('filename', 'merged_file')
output_path = os.path.join(UPLOAD_DIR, original_filename)
with open(output_path, 'wb') as out_f:
for chunk_name in chunk_files:
chunk_path = os.path.join(chunk_dir, chunk_name)
with open(chunk_path, 'rb') as in_f:
out_f.write(in_f.read())
# 删除临时分片目录
import shutil
shutil.rmtree(chunk_dir)
return jsonify({"status": "merged", "path": output_path})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
代码逐段详解:切片、上传、合并
客户端切片逻辑
chunk_size:控制每片大小,建议5-10MB,平衡网络请求次数与失败重试成本file_md5:确保不同文件即使同名也能区分(避免合并错乱)f.seek(start):直接从指定位置读取,避免一次性加载整个文件
上传与容错
- 每个分片通过
requests.post的files参数发送,Flask端用request.files['chunk']接收 - 若单个分片上传失败,可加入重试机制(最多3次,间隔2秒)
服务端合并关键点
- 排序合并:分片名使用序号(0,1,2...),
sorted按整数排序保证顺序 - 原子性检查:合并前检查分片数量是否等于
total_chunks,避免缺片文件
前端调用示例
# 调用客户端
split_and_upload("large_video.mp4", chunk_size=10*1024*1024)
常见问题与优化技巧(问答)
问1:如何保证分片上传的完整性?
答:客户端可为每个分片计算SHA256值,服务端在接收后验证,也可在合并完成后,对比整体文件的MD5。
问2:网络中断后如何断点续传?
答:服务端增加/check接口,返回已上传的分片序号列表,客户端在开始上传前先查询缺失分片,只上传未完成的部分。
问3:分片上传速度太慢怎么办?
答:启用并发上传,使用ThreadPoolExecutor同时上传3-5个分片,注意控制并发数,避免触发服务器限流。
问4:分片存储临时目录会不会占满磁盘?
答:设置定时任务(如cron)删除超过24小时的临时分片目录,合并成功后立即清理。
问5:如何防止恶意上传?
答:增加身份验证Token(如JWT),限制单文件最大50GB,并对分片数量做上限检查(如最大1000片)。
安全性与并发控制建议
- 接口鉴权:所有上传/合并接口需携带有效的Token,防止非法调用
- 并发处理:服务端使用Redis记录每个文件的上传进度,避免多用户在合并阶段产生冲突
- 文件类型校验:合并完成后检查MIME类型或魔数,防止上传可执行文件
- 日志记录:详细记录每个分片的上传时间、IP、文件名称,便于排查异常
通过上述案例和原理,你已掌握Python实现文件分片上传的核心技术,实际生产环境中,可将客户端集成到Web应用中(如Vue + Axios),服务端改用Gunicorn部署以支持高并发,这种方案在视频平台、网盘系统中被广泛使用,是处理大文件传输的“标准答案”。