本文目录导读:

基于文件行数分割(通用型)
Python脚本
#!/usr/bin/env python3
# split_large_file.py
import os
import sys
from pathlib import Path
def split_by_lines(input_file, lines_per_file=100000):
"""按行数分割大文件"""
input_path = Path(input_file)
if not input_path.exists():
print(f"文件 {input_file} 不存在")
return
# 生成输出文件前缀
base_name = input_path.stem
output_dir = input_path.parent / f"{base_name}_split"
output_dir.mkdir(exist_ok=True)
file_count = 0
current_lines = []
with open(input_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
current_lines.append(line)
if len(current_lines) >= lines_per_file:
file_count += 1
output_file = output_dir / f"{base_name}_part_{file_count:04d}.txt"
with open(output_file, 'w', encoding='utf-8') as out:
out.writelines(current_lines)
print(f"已创建: {output_file}")
current_lines = []
# 处理剩余的行
if current_lines:
file_count += 1
output_file = output_dir / f"{base_name}_part_{file_count:04d}.txt"
with open(output_file, 'w', encoding='utf-8') as out:
out.writelines(current_lines)
print(f"已创建: {output_file}")
print(f"完成! 共分割为 {file_count} 个文件")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python split_large_file.py <文件名> [行数]")
sys.exit(1)
input_file = sys.argv[1]
lines_per_file = int(sys.argv[2]) if len(sys.argv) > 2 else 100000
split_by_lines(input_file, lines_per_file)
按文件大小分割
#!/usr/bin/env python3
# split_by_size.py
import os
import sys
from pathlib import Path
def split_by_file_size(input_file, max_size_mb=100):
"""按文件大小分割(近似)"""
input_path = Path(input_file)
max_bytes = max_size_mb * 1024 * 1024
base_name = input_path.stem
output_dir = input_path.parent / f"{base_name}_split_by_size"
output_dir.mkdir(exist_ok=True)
file_count = 0
current_size = 0
current_lines = []
with open(input_file, 'r', encoding='utf-8') as f:
for line in f:
current_lines.append(line)
current_size += len(line.encode('utf-8'))
if current_size >= max_bytes:
file_count += 1
output_file = output_dir / f"{base_name}_part_{file_count:04d}.txt"
with open(output_file, 'w', encoding='utf-8') as out:
out.writelines(current_lines)
print(f"已创建: {output_file} ({current_size/1024/1024:.2f}MB)")
current_lines = []
current_size = 0
if current_lines:
file_count += 1
output_file = output_dir / f"{base_name}_part_{file_count:04d}.txt"
with open(output_file, 'w', encoding='utf-8') as out:
out.writelines(current_lines)
print(f"已创建: {output_file}")
print(f"完成! 共分割为 {file_count} 个文件")
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python split_by_size.py <文件名> [最大MB]")
sys.exit(1)
input_file = sys.argv[1]
max_size = int(sys.argv[2]) if len(sys.argv) > 2 else 100
split_by_file_size(input_file, max_size)
智能分割(保持数据完整性)
#!/usr/bin/env python3
# smart_split.py
import os
import re
import sys
from pathlib import Path
def smart_split(input_file, max_lines=50000):
"""智能分割,避免在数据中间截断"""
input_path = Path(input_file)
base_name = input_path.stem
output_dir = input_path.parent / f"{base_name}_smart_split"
output_dir.mkdir(exist_ok=True)
# 检测文件类型
ext = input_path.suffix.lower()
# CSV、JSON等格式的边界检测
boundary_patterns = {
'.csv': r'^[\d\w]', # CSV通常每行以数据开始
'.json': r'^[{[]?[\s]*["\d{]', # JSON对象开始
'.log': r'^\[\d{4}-\d{2}-\d{2}', # 日志时间戳开头
}
# 默认按完整行分割
pattern = boundary_patterns.get(ext, None)
file_count = 0
current_lines = []
with open(input_file, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f, 1):
current_lines.append(line)
if len(current_lines) >= max_lines:
# 如果设置了边界模式,尝试在自然边界处分割
if pattern:
# 从末尾查找边界
split_point = len(current_lines)
for i in range(len(current_lines) - 1, len(current_lines) // 2, -1):
if re.match(pattern, current_lines[i]):
split_point = i
break
if split_point > len(current_lines) // 2:
file_count += 1
output_lines = current_lines[:split_point]
current_lines = current_lines[split_point:]
output_file = output_dir / f"{base_name}_part_{file_count:04d}{ext}"
with open(output_file, 'w', encoding='utf-8') as out:
out.writelines(output_lines)
print(f"已创建: {output_file} ({len(output_lines)} 行)")
else:
file_count += 1
output_file = output_dir / f"{base_name}_part_{file_count:04d}{ext}"
with open(output_file, 'w', encoding='utf-8') as out:
out.writelines(current_lines)
print(f"已创建: {output_file} ({len(current_lines)} 行)")
current_lines = []
# 处理剩余行
if current_lines:
file_count += 1
output_file = output_dir / f"{base_name}_part_{file_count:04d}{ext}"
with open(output_file, 'w', encoding='utf-8') as out:
out.writelines(current_lines)
print(f"已创建: {output_file} ({len(current_lines)} 行)")
print(f"完成! 共分割为 {file_count} 个文件")
return file_count
if __name__ == "__main__":
if len(sys.argv) < 2:
print("用法: python smart_split.py <文件名> [行数]")
sys.exit(1)
input_file = sys.argv[1]
lines_per_file = int(sys.argv[2]) if len(sys.argv) > 2 else 50000
smart_split(input_file, lines_per_file)
Shell脚本版本(Linux/Unix)
#!/bin/bash
# split_large_file.sh
# 检查参数
if [ $# -lt 1 ]; then
echo "用法: $0 <文件名> [行数]"
exit 1
fi
INPUT_FILE="$1"
LINES=${2:-100000}
# 检查文件是否存在
if [ ! -f "$INPUT_FILE" ]; then
echo "错误: 文件 $INPUT_FILE 不存在"
exit 1
fi
# 创建输出目录
BASE_NAME=$(basename "$INPUT_FILE" .txt)
OUTPUT_DIR="$(dirname "$INPUT_FILE")/${BASE_NAME}_split"
mkdir -p "$OUTPUT_DIR"
# 使用split命令分割
split -l "$LINES" \
-d \
-a 4 \
--additional-suffix=.txt \
"$INPUT_FILE" \
"${OUTPUT_DIR}/${BASE_NAME}_part_"
echo "完成! 文件分割到 $OUTPUT_DIR 目录"
增强版:支持多种格式
#!/usr/bin/env python3
# universal_splitter.py
import os
import sys
import json
import csv
from pathlib import Path
from typing import List, Optional
class FileSplitter:
def __init__(self, input_file: str):
self.input_file = Path(input_file)
if not self.input_file.exists():
raise FileNotFoundError(f"文件 {input_file} 不存在")
self.base_name = self.input_file.stem
self.output_dir = self.input_file.parent / f"{self.base_name}_parts"
self.output_dir.mkdir(exist_ok=True)
def split_by_lines(self, lines_per_file: int = 100000) -> int:
"""按行数分割"""
file_count = 0
current_batch = []
with open(self.input_file, 'r', encoding='utf-8') as f:
for line in f:
current_batch.append(line)
if len(current_batch) >= lines_per_file:
file_count += 1
self._write_batch(current_batch, file_count)
current_batch = []
if current_batch:
file_count += 1
self._write_batch(current_batch, file_count)
return file_count
def split_csv_with_header(self, lines_per_file: int = 100000) -> int:
"""分割CSV,保留表头"""
file_count = 0
header = None
current_batch = []
with open(self.input_file, 'r', encoding='utf-8') as f:
reader = csv.reader(f)
header = next(reader, None)
for row in reader:
current_batch.append(row)
if len(current_batch) >= lines_per_file:
file_count += 1
self._write_csv_batch(header, current_batch, file_count)
current_batch = []
if current_batch:
file_count += 1
self._write_csv_batch(header, current_batch, file_count)
return file_count
def _write_batch(self, lines: List[str], batch_num: int):
"""写入文本批次"""
output_file = self.output_dir / f"{self.base_name}_part_{batch_num:04d}.txt"
with open(output_file, 'w', encoding='utf-8') as f:
f.writelines(lines)
size_mb = output_file.stat().st_size / 1024 / 1024
print(f"创建文件: {output_file} ({len(lines)} 行, {size_mb:.2f} MB)")
def _write_csv_batch(self, header: Optional[List], rows: List[List], batch_num: int):
"""写入CSV批次"""
output_file = self.output_dir / f"{self.base_name}_part_{batch_num:04d}.csv"
with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
if header:
writer.writerow(header)
writer.writerows(rows)
size_mb = output_file.stat().st_size / 1024 / 1024
print(f"创建CSV: {output_file} ({len(rows)} 行, {size_mb:.2f} MB)")
def main():
if len(sys.argv) < 2:
print("用法: python universal_splitter.py <文件名> [行数] [格式]")
print("格式: auto, csv, text (默认 auto)")
sys.exit(1)
input_file = sys.argv[1]
lines_per_file = int(sys.argv[2]) if len(sys.argv) > 2 else 100000
file_format = sys.argv[3] if len(sys.argv) > 3 else "auto"
try:
splitter = FileSplitter(input_file)
# 自动检测格式
if file_format == "auto":
ext = Path(input_file).suffix.lower()
if ext == '.csv':
file_format = "csv"
else:
file_format = "text"
# 执行分割
if file_format == "csv":
file_count = splitter.split_csv_with_header(lines_per_file)
else:
file_count = splitter.split_by_lines(lines_per_file)
print(f"\n完成! 文件已分割为 {file_count} 个部分")
print(f"输出目录: {splitter.output_dir}")
except Exception as e:
print(f"错误: {e}")
sys.exit(1)
if __name__ == "__main__":
main()
使用示例
# 基本用法 python split_large_file.py large_log.txt 50000 # 按大小分割 python split_by_size.py data.csv 200 # 智能分割(保持数据完整) python smart_split.py logfile.json 30000 # Shell版本 ./split_large_file.sh data.txt 100000 # 通用分割器 python universal_splitter.py large_dataset.csv 50000 csv
性能优化建议
- 大文件处理:使用
with语句自动管理文件句柄 - 内存控制:分批读取,避免一次性加载整个文件
- 编码处理:统一使用 UTF-8 编码
- 错误处理:完善的异常捕获和用户提示
- 进度显示:添加进度条或日志输出
选择适合你需求的脚本,或者根据需要组合使用。