本文目录导读:

我来为您详细介绍Python多线程下载的实现方法,包括基础案例和进阶优化。
基础多线程下载示例
简单的文件分块下载
import requests
import threading
import os
import time
class DownloadThread(threading.Thread):
"""下载线程类"""
def __init__(self, url, start_pos, end_pos, file_name, thread_id):
super().__init__()
self.url = url
self.start_pos = start_pos
self.end_pos = end_pos
self.file_name = file_name
self.thread_id = thread_id
self.downloaded = 0
def run(self):
"""线程执行方法"""
headers = {'Range': f'bytes={self.start_pos}-{self.end_pos}'}
try:
response = requests.get(self.url, headers=headers, stream=True)
response.raise_for_status()
# 写入文件指定位置
with open(self.file_name, 'rb+') as f:
f.seek(self.start_pos)
for chunk in response.iter_content(chunk_size=1024):
if chunk:
f.write(chunk)
self.downloaded += len(chunk)
print(f"线程 {self.thread_id} 下载完成 ({self.start_pos}-{self.end_pos})")
except Exception as e:
print(f"线程 {self.thread_id} 下载失败: {e}")
class MultiThreadDownloader:
"""多线程下载管理器"""
def __init__(self, url, num_threads=4, output_file=None):
self.url = url
self.num_threads = num_threads
self.output_file = output_file or os.path.basename(url)
self.file_size = 0
self.threads = []
def get_file_size(self):
"""获取文件大小"""
response = requests.head(self.url)
response.raise_for_status()
self.file_size = int(response.headers.get('content-length', 0))
return self.file_size
def download(self):
"""执行多线程下载"""
print(f"开始下载: {self.url}")
print(f"文件大小: {self.file_size / 1024 / 1024:.2f} MB")
# 创建空文件
with open(self.output_file, 'wb') as f:
f.truncate(self.file_size)
# 计算每个线程的下载范围
chunk_size = self.file_size // self.num_threads
# 创建并启动线程
start_time = time.time()
for i in range(self.num_threads):
start = i * chunk_size
end = start + chunk_size - 1 if i < self.num_threads - 1 else self.file_size - 1
thread = DownloadThread(self.url, start, end, self.output_file, i)
self.threads.append(thread)
thread.start()
print(f"线程 {i} 启动: 下载范围 {start}-{end}")
# 等待所有线程完成
for thread in self.threads:
thread.join()
elapsed_time = time.time() - start_time
print(f"\n下载完成!耗时: {elapsed_time:.2f} 秒")
print(f"平均速度: {self.file_size / 1024 / 1024 / elapsed_time:.2f} MB/s")
# 使用示例
if __name__ == "__main__":
# 示例URL(请替换为实际下载链接)
url = "https://example.com/large-file.zip"
# 使用4个线程下载
downloader = MultiThreadDownloader(url, num_threads=4)
file_size = downloader.get_file_size()
downloader.download()
带进度显示的多线程下载
import requests
import threading
import os
import time
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
class ProgressDownloader:
"""带进度显示的多线程下载器"""
def __init__(self, url, num_threads=4):
self.url = url
self.num_threads = num_threads
self.file_name = os.path.basename(url)
self.file_size = 0
self.downloaded = 0
self.lock = threading.Lock()
self.pbar = None
def get_file_size(self):
"""获取文件大小"""
response = requests.head(self.url)
self.file_size = int(response.headers.get('content-length', 0))
return self.file_size
def download_chunk(self, start, end, thread_id):
"""下载文件块"""
headers = {'Range': f'bytes={start}-{end}'}
try:
response = requests.get(self.url, headers=headers, stream=True, timeout=10)
with open(self.file_name, 'r+b') as f:
f.seek(start)
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
with self.lock:
self.downloaded += len(chunk)
if self.pbar:
self.pbar.update(len(chunk))
return True
except Exception as e:
print(f"线程 {thread_id} 下载失败: {e}")
return False
def download_with_progress(self):
"""带进度条的多线程下载"""
print(f"开始下载: {self.url}")
print(f"文件大小: {self.file_size / 1024 / 1024:.2f} MB")
print(f"使用线程数: {self.num_threads}")
# 创建空文件
with open(self.file_name, 'wb') as f:
f.truncate(self.file_size)
# 计算分块
chunk_size = self.file_size // self.num_threads
ranges = []
for i in range(self.num_threads):
start = i * chunk_size
end = start + chunk_size - 1 if i < self.num_threads - 1 else self.file_size - 1
ranges.append((start, end, i))
# 创建进度条
with tqdm(total=self.file_size, unit='B', unit_scale=True, desc="下载进度") as self.pbar:
# 使用线程池执行下载
with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
futures = []
for start, end, thread_id in ranges:
future = executor.submit(self.download_chunk, start, end, thread_id)
futures.append(future)
# 等待所有任务完成
for future in futures:
future.result()
print(f"\n下载完成!文件保存为: {self.file_name}")
# 使用示例
if __name__ == "__main__":
# 示例URL
url = "https://example.com/large-file.zip"
downloader = ProgressDownloader(url, num_threads=4)
file_size = downloader.get_file_size()
downloader.download_with_progress()
支持断点续传的下载器
import requests
import threading
import os
import json
from datetime import datetime
class ResumeDownloader:
"""支持断点续传的多线程下载器"""
def __init__(self, url, num_threads=4, save_dir="downloads"):
self.url = url
self.num_threads = num_threads
self.save_dir = save_dir
self.file_name = os.path.basename(url) or "downloaded_file"
self.file_path = os.path.join(save_dir, self.file_name)
self.status_file = self.file_path + ".status"
self.file_size = 0
self.lock = threading.Lock()
# 创建下载目录
os.makedirs(save_dir, exist_ok=True)
def save_status(self, thread_status):
"""保存下载状态"""
status = {
'url': self.url,
'file_size': self.file_size,
'thread_status': thread_status,
'timestamp': datetime.now().isoformat()
}
with open(self.status_file, 'w') as f:
json.dump(status, f)
def load_status(self):
"""加载下载状态"""
if not os.path.exists(self.status_file):
return None
try:
with open(self.status_file, 'r') as f:
status = json.load(f)
return status
except:
return None
def get_file_size(self):
"""获取文件大小"""
response = requests.head(self.url)
self.file_size = int(response.headers.get('content-length', 0))
return self.file_size
def download_chunk(self, start, end, thread_id, existing_data=0):
"""下载文件块(支持断点续传)"""
if start >= end:
return True
headers = {'Range': f'bytes={start + existing_data}-{end}'}
try:
response = requests.get(self.url, headers=headers, stream=True)
with open(self.file_path, 'r+b') as f:
f.seek(start + existing_data)
for chunk in response.iter_content(chunk_size=8192):
if chunk:
f.write(chunk)
print(f"线程 {thread_id} 完成: {start}-{end}")
return True
except Exception as e:
print(f"线程 {thread_id} 下载失败: {e}")
return False
def download(self):
"""执行下载(支持断点续传)"""
# 检查是否有未完成的下载
status = self.load_status()
if status:
print("检测到未完成的下载,尝试续传...")
self.file_size = status['file_size']
thread_status = status['thread_status']
else:
print("开始新下载...")
self.get_file_size()
# 创建空文件
with open(self.file_path, 'wb') as f:
f.truncate(self.file_size)
# 初始化线程状态
chunk_size = self.file_size // self.num_threads
thread_status = []
for i in range(self.num_threads):
start = i * chunk_size
end = start + chunk_size - 1 if i < self.num_threads - 1 else self.file_size - 1
thread_status.append({
'start': start,
'end': end,
'downloaded': 0
})
print(f"文件大小: {self.file_size / 1024 / 1024:.2f} MB")
print(f"使用线程数: {self.num_threads}")
# 启动下载线程
threads = []
for i in range(self.num_threads):
t = threading.Thread(
target=self.download_chunk,
args=(
thread_status[i]['start'],
thread_status[i]['end'],
i,
thread_status[i]['downloaded']
)
)
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
# 删除状态文件(下载完成)
if os.path.exists(self.status_file):
os.remove(self.status_file)
print(f"\n下载完成!文件保存为: {self.file_path}")
# 使用示例
if __name__ == "__main__":
url = "https://example.com/large-file.zip"
downloader = ResumeDownloader(url, num_threads=4)
downloader.download()
高级功能:多源多线程下载
import requests
import threading
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
class AdvancedDownloader:
"""多源多线程下载器"""
def __init__(self, urls, num_threads=4, output_file="downloaded_file"):
self.urls = urls # 可以是多个镜像源
self.num_threads = num_threads
self.output_file = output_file
def verify_file_integrity(self, file_path, expected_hash=None):
"""验证文件完整性"""
if not expected_hash:
return True
sha256_hash = hashlib.sha256()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest() == expected_hash
def download_from_source(self, url, start, end, output_file):
"""从特定源下载文件块"""
headers = {'Range': f'bytes={start}-{end}'}
try:
response = requests.get(url, headers=headers, timeout=30)
response.raise_for_status()
return response.content
except:
return None
def download_with_multiple_sources(self):
"""多源多线程下载"""
print(f"使用 {len(self.urls)} 个下载源")
# 获取文件大小
response = requests.head(self.urls[0])
file_size = int(response.headers.get('content-length', 0))
# 计算分块
chunk_size = file_size // self.num_threads
chunks = []
for i in range(self.num_threads):
start = i * chunk_size
end = start + chunk_size - 1 if i < self.num_threads - 1 else file_size - 1
chunks.append((start, end))
# 使用线程池下载
with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
futures = []
for i, (start, end) in enumerate(chunks):
# 轮流使用不同的源
source_url = self.urls[i % len(self.urls)]
future = executor.submit(
self.download_from_source,
source_url, start, end, f"chunk_{i}"
)
futures.append(future)
# 收集下载的数据
data_chunks = []
for future in as_completed(futures):
result = future.result()
if result:
data_chunks.append(result)
# 合并文件
with open(self.output_file, 'wb') as f:
for chunk in data_chunks:
f.write(chunk)
print(f"下载完成!文件保存为: {self.output_file}")
# 使用示例
if __name__ == "__main__":
# 多个下载源(镜像)
urls = [
"https://mirror1.example.com/large-file.zip",
"https://mirror2.example.com/large-file.zip",
"https://mirror3.example.com/large-file.zip"
]
downloader = AdvancedDownloader(urls, num_threads=6)
downloader.download_with_multiple_sources()
完整使用示例
# 完整示例:下载测试文件
def test_download():
"""测试下载功能"""
# 测试用的下载URL(请使用真实URL替换)
test_url = "https://speed.hetzner.de/100MB.bin"
print("=" * 50)
print("Python多线程下载测试")
print("=" * 50)
# 测试带进度条的下载
print("\n1. 测试带进度条的下载器")
downloader = ProgressDownloader(test_url, num_threads=8)
file_size = downloader.get_file_size()
if file_size > 0:
start_time = time.time()
downloader.download_with_progress()
elapsed = time.time() - start_time
print(f"下载速度: {file_size / 1024 / 1024 / elapsed:.2f} MB/s")
print(f"耗时: {elapsed:.2f} 秒")
else:
print("无法获取文件大小,请检查URL")
if __name__ == "__main__":
# 安装依赖
# pip install requests tqdm
test_download()
使用建议
-
选择合适的线程数:
- 4-8 个线程效果最好
- 过多线程可能导致反爬机制触发
- 考虑网络带宽限制
-
错误处理:
- 添加重试机制
- 实现断点续传
- 记录下载日志
-
性能优化:
- 使用连接池
- 调整缓冲区大小
- 考虑使用异步IO
-
注意事项:
- 遵守网站的下载规则
- 注意版权问题
- 添加合理的延时避免对服务器造成压力
这些示例提供了从基础到高级的多线程下载实现,您可以根据实际需求选择合适的方案。