Python案例:如何实现文件分片?——从原理到代码实战
目录导读
文件分片的应用场景
在现实开发中,我们经常遇到超大文件传输或断点续传需求。

- 上传一个 2GB 的视频文件到云服务器,直接上传可能因网络波动失败,分片后每片 10MB,失败后仅重传该分片。
- 分布式存储系统中,将大文件切分成固定大小的块,分散存储在不同节点。
- 内存受限的嵌入式设备,无法一次性读取大文件,需要分片处理。
Python 案例能帮你快速实现这一机制,且代码简洁,无需第三方库也能完成基本功能。
核心原理:为什么需要分片?
文件分片的本质是将一个大文件按字节数均匀切割,每一片都是原文件的一个连续部分,分片后通常还需记录元数据(如分片序号、总片数、各片哈希值等),以便后续校验和合并。
关键参数:
chunk_size:每片的大小(如 4MB、10MB)total_chunks:总片数 = ceil(文件总大小 / chunk_size)
数据流过程:
原文件 → 分片1 | 分片2 | ... | 分片N → 合并 → 还原文件
Python实现分片的关键技术
1 使用二进制模式读写
open(file, 'rb') 和 open(file, 'wb') 能确保不因编码问题损坏文件。
2 利用 os.path.getsize() 获取文件大小
import os
file_size = os.path.getsize("large_file.mp4")
3 分片循环逻辑
利用 read(chunk_size) 每次读取固定字节,直到读取内容为空。
完整代码案例:分片与合并
以下代码实现了一个可复用的文件分片工具,支持自定义分片大小,并输出分片文件与元数据文件。
import os
import hashlib
import json
def split_file(file_path, chunk_size=10*1024*1024):
"""分片函数,默认每片10MB"""
file_size = os.path.getsize(file_path)
base_name = os.path.basename(file_path)
chunk_dir = f"{base_name}_chunks"
os.makedirs(chunk_dir, exist_ok=True)
meta = {
"original_name": base_name,
"chunk_size": chunk_size,
"total_chunks": 0,
"chunks": []
}
with open(file_path, 'rb') as f:
chunk_index = 0
while True:
data = f.read(chunk_size)
if not data:
break
chunk_file = os.path.join(chunk_dir, f"{base_name}.part{chunk_index}")
with open(chunk_file, 'wb') as chunk_f:
chunk_f.write(data)
# 记录每个分片的SHA256
sha256 = hashlib.sha256(data).hexdigest()
meta["chunks"].append({"index": chunk_index, "sha256": sha256})
chunk_index += 1
meta["total_chunks"] = chunk_index
meta_file = os.path.join(chunk_dir, "meta.json")
with open(meta_file, 'w') as mf:
json.dump(meta, mf, indent=2)
print(f"分片完成,共 {chunk_index} 片,保存至 {chunk_dir}")
return chunk_dir
def merge_file(chunk_dir):
"""合并分片,还原原文件"""
meta_file = os.path.join(chunk_dir, "meta.json")
with open(meta_file, 'r') as mf:
meta = json.load(mf)
output_path = f"recovered_{meta['original_name']}"
with open(output_path, 'wb') as out:
for i in range(meta["total_chunks"]):
part = os.path.join(chunk_dir, f"{meta['original_name']}.part{i}")
with open(part, 'rb') as pf:
out.write(pf.read())
print(f"合并完成,输出文件:{output_path}")
# 使用示例
split_file("example.zip", chunk_size=5*1024*1024)
merge_file("example.zip_chunks")
代码亮点
- 哈希校验:记录每个分片的SHA256,合并后可重新计算分片哈希以验证完整性。
- 元数据独立存储:
meta.json保存分片顺序与校验值,避免合并时依赖文件名顺序。 - 分片目录隔离:避免分片文件污染原文件目录。
常见问题与优化建议
问题1:分片过多导致文件列表混乱
解决:使用 chunk_size 参数合理调整,一般 5~50MB 为宜;对于超大文件(如 100GB),建议结合数据库记录分片信息。
问题2:磁盘空间不足
解决:分片后立即传输或删除处理完的分片;使用 tempfile 模块创建临时分片。
问题3:分片传输中断后如何恢复?
解决:在元数据中记录已传输分片序号,客户端请求从最后一个成功分片继续传输(断点续传)。
优化建议
- 使用
threading或asyncio实现并行分片/上传。 - 对于文本文件,仍用二进制模式分片,避免换行符被转义。
- 添加
try-except处理磁盘错误、权限问题。
问答环节
Q1:分片后文件是否必须合并?
A:不一定,如果分片用于流式处理(如视频分段播放),合并反而失去意义,但多数场景(上传/下载)最终需合并。
Q2:如何保证分片合并后的文件与原文件完全一致?
A:使用哈希校验,分片时计算每个分片的SHA256,合并后对全文件再计算一次哈希,对比是否与原始文件哈希相等。
Q3:Python分片能处理多大文件?
A:理论上无上限,取决于磁盘和内存,但过大的文件直接分片可能导致内存波动,建议使用 mmap(内存映射)优化性能。
Q4:为什么必须用二进制模式读取?
A:文本模式('r')会自动转换换行符(如 Windows 下 \r\n 转 \n),导致分片内容被篡改,二进制模式('rb')保持原始字节。
Q5:如果没有元数据文件,如何合并分片?
A:可约定分片文件命名规则(如 file.part0, file.part1),按文件名后缀的数字顺序合并,但无法校验完整性,且风险较大。
通过本文的Python案例实战,你已经掌握了文件分片的核心代码与思路,无论你是开发文件传输工具、构建分布式存储系统,还是实现断点续传,上述代码都可以作为坚实的技术基础,尝试修改 chunk_size 并测试不同大小文件,你将在实践中获得更深的体会。