Python案例如何实现多线程下载?

wen python案例 8

本文目录导读:

Python案例如何实现多线程下载?

  1. 基础多线程下载示例
  2. 带进度显示的多线程下载
  3. 支持断点续传的下载器
  4. 高级功能:多源多线程下载
  5. 完整使用示例
  6. 使用建议

我来为您详细介绍Python多线程下载的实现方法,包括基础案例和进阶优化。

基础多线程下载示例

简单的文件分块下载

import requests
import threading
import os
import time
class DownloadThread(threading.Thread):
    """下载线程类"""
    def __init__(self, url, start_pos, end_pos, file_name, thread_id):
        super().__init__()
        self.url = url
        self.start_pos = start_pos
        self.end_pos = end_pos
        self.file_name = file_name
        self.thread_id = thread_id
        self.downloaded = 0
    def run(self):
        """线程执行方法"""
        headers = {'Range': f'bytes={self.start_pos}-{self.end_pos}'}
        try:
            response = requests.get(self.url, headers=headers, stream=True)
            response.raise_for_status()
            # 写入文件指定位置
            with open(self.file_name, 'rb+') as f:
                f.seek(self.start_pos)
                for chunk in response.iter_content(chunk_size=1024):
                    if chunk:
                        f.write(chunk)
                        self.downloaded += len(chunk)
            print(f"线程 {self.thread_id} 下载完成 ({self.start_pos}-{self.end_pos})")
        except Exception as e:
            print(f"线程 {self.thread_id} 下载失败: {e}")
class MultiThreadDownloader:
    """多线程下载管理器"""
    def __init__(self, url, num_threads=4, output_file=None):
        self.url = url
        self.num_threads = num_threads
        self.output_file = output_file or os.path.basename(url)
        self.file_size = 0
        self.threads = []
    def get_file_size(self):
        """获取文件大小"""
        response = requests.head(self.url)
        response.raise_for_status()
        self.file_size = int(response.headers.get('content-length', 0))
        return self.file_size
    def download(self):
        """执行多线程下载"""
        print(f"开始下载: {self.url}")
        print(f"文件大小: {self.file_size / 1024 / 1024:.2f} MB")
        # 创建空文件
        with open(self.output_file, 'wb') as f:
            f.truncate(self.file_size)
        # 计算每个线程的下载范围
        chunk_size = self.file_size // self.num_threads
        # 创建并启动线程
        start_time = time.time()
        for i in range(self.num_threads):
            start = i * chunk_size
            end = start + chunk_size - 1 if i < self.num_threads - 1 else self.file_size - 1
            thread = DownloadThread(self.url, start, end, self.output_file, i)
            self.threads.append(thread)
            thread.start()
            print(f"线程 {i} 启动: 下载范围 {start}-{end}")
        # 等待所有线程完成
        for thread in self.threads:
            thread.join()
        elapsed_time = time.time() - start_time
        print(f"\n下载完成!耗时: {elapsed_time:.2f} 秒")
        print(f"平均速度: {self.file_size / 1024 / 1024 / elapsed_time:.2f} MB/s")
# 使用示例
if __name__ == "__main__":
    # 示例URL(请替换为实际下载链接)
    url = "https://example.com/large-file.zip"
    # 使用4个线程下载
    downloader = MultiThreadDownloader(url, num_threads=4)
    file_size = downloader.get_file_size()
    downloader.download()

带进度显示的多线程下载

import requests
import threading
import os
import time
from tqdm import tqdm
from concurrent.futures import ThreadPoolExecutor
class ProgressDownloader:
    """带进度显示的多线程下载器"""
    def __init__(self, url, num_threads=4):
        self.url = url
        self.num_threads = num_threads
        self.file_name = os.path.basename(url)
        self.file_size = 0
        self.downloaded = 0
        self.lock = threading.Lock()
        self.pbar = None
    def get_file_size(self):
        """获取文件大小"""
        response = requests.head(self.url)
        self.file_size = int(response.headers.get('content-length', 0))
        return self.file_size
    def download_chunk(self, start, end, thread_id):
        """下载文件块"""
        headers = {'Range': f'bytes={start}-{end}'}
        try:
            response = requests.get(self.url, headers=headers, stream=True, timeout=10)
            with open(self.file_name, 'r+b') as f:
                f.seek(start)
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
                        with self.lock:
                            self.downloaded += len(chunk)
                            if self.pbar:
                                self.pbar.update(len(chunk))
            return True
        except Exception as e:
            print(f"线程 {thread_id} 下载失败: {e}")
            return False
    def download_with_progress(self):
        """带进度条的多线程下载"""
        print(f"开始下载: {self.url}")
        print(f"文件大小: {self.file_size / 1024 / 1024:.2f} MB")
        print(f"使用线程数: {self.num_threads}")
        # 创建空文件
        with open(self.file_name, 'wb') as f:
            f.truncate(self.file_size)
        # 计算分块
        chunk_size = self.file_size // self.num_threads
        ranges = []
        for i in range(self.num_threads):
            start = i * chunk_size
            end = start + chunk_size - 1 if i < self.num_threads - 1 else self.file_size - 1
            ranges.append((start, end, i))
        # 创建进度条
        with tqdm(total=self.file_size, unit='B', unit_scale=True, desc="下载进度") as self.pbar:
            # 使用线程池执行下载
            with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
                futures = []
                for start, end, thread_id in ranges:
                    future = executor.submit(self.download_chunk, start, end, thread_id)
                    futures.append(future)
                # 等待所有任务完成
                for future in futures:
                    future.result()
        print(f"\n下载完成!文件保存为: {self.file_name}")
# 使用示例
if __name__ == "__main__":
    # 示例URL
    url = "https://example.com/large-file.zip"
    downloader = ProgressDownloader(url, num_threads=4)
    file_size = downloader.get_file_size()
    downloader.download_with_progress()

支持断点续传的下载器

import requests
import threading
import os
import json
from datetime import datetime
class ResumeDownloader:
    """支持断点续传的多线程下载器"""
    def __init__(self, url, num_threads=4, save_dir="downloads"):
        self.url = url
        self.num_threads = num_threads
        self.save_dir = save_dir
        self.file_name = os.path.basename(url) or "downloaded_file"
        self.file_path = os.path.join(save_dir, self.file_name)
        self.status_file = self.file_path + ".status"
        self.file_size = 0
        self.lock = threading.Lock()
        # 创建下载目录
        os.makedirs(save_dir, exist_ok=True)
    def save_status(self, thread_status):
        """保存下载状态"""
        status = {
            'url': self.url,
            'file_size': self.file_size,
            'thread_status': thread_status,
            'timestamp': datetime.now().isoformat()
        }
        with open(self.status_file, 'w') as f:
            json.dump(status, f)
    def load_status(self):
        """加载下载状态"""
        if not os.path.exists(self.status_file):
            return None
        try:
            with open(self.status_file, 'r') as f:
                status = json.load(f)
            return status
        except:
            return None
    def get_file_size(self):
        """获取文件大小"""
        response = requests.head(self.url)
        self.file_size = int(response.headers.get('content-length', 0))
        return self.file_size
    def download_chunk(self, start, end, thread_id, existing_data=0):
        """下载文件块(支持断点续传)"""
        if start >= end:
            return True
        headers = {'Range': f'bytes={start + existing_data}-{end}'}
        try:
            response = requests.get(self.url, headers=headers, stream=True)
            with open(self.file_path, 'r+b') as f:
                f.seek(start + existing_data)
                for chunk in response.iter_content(chunk_size=8192):
                    if chunk:
                        f.write(chunk)
            print(f"线程 {thread_id} 完成: {start}-{end}")
            return True
        except Exception as e:
            print(f"线程 {thread_id} 下载失败: {e}")
            return False
    def download(self):
        """执行下载(支持断点续传)"""
        # 检查是否有未完成的下载
        status = self.load_status()
        if status:
            print("检测到未完成的下载,尝试续传...")
            self.file_size = status['file_size']
            thread_status = status['thread_status']
        else:
            print("开始新下载...")
            self.get_file_size()
            # 创建空文件
            with open(self.file_path, 'wb') as f:
                f.truncate(self.file_size)
            # 初始化线程状态
            chunk_size = self.file_size // self.num_threads
            thread_status = []
            for i in range(self.num_threads):
                start = i * chunk_size
                end = start + chunk_size - 1 if i < self.num_threads - 1 else self.file_size - 1
                thread_status.append({
                    'start': start,
                    'end': end,
                    'downloaded': 0
                })
        print(f"文件大小: {self.file_size / 1024 / 1024:.2f} MB")
        print(f"使用线程数: {self.num_threads}")
        # 启动下载线程
        threads = []
        for i in range(self.num_threads):
            t = threading.Thread(
                target=self.download_chunk,
                args=(
                    thread_status[i]['start'],
                    thread_status[i]['end'],
                    i,
                    thread_status[i]['downloaded']
                )
            )
            threads.append(t)
            t.start()
        # 等待所有线程完成
        for t in threads:
            t.join()
        # 删除状态文件(下载完成)
        if os.path.exists(self.status_file):
            os.remove(self.status_file)
        print(f"\n下载完成!文件保存为: {self.file_path}")
# 使用示例
if __name__ == "__main__":
    url = "https://example.com/large-file.zip"
    downloader = ResumeDownloader(url, num_threads=4)
    downloader.download()

高级功能:多源多线程下载

import requests
import threading
import hashlib
from concurrent.futures import ThreadPoolExecutor, as_completed
class AdvancedDownloader:
    """多源多线程下载器"""
    def __init__(self, urls, num_threads=4, output_file="downloaded_file"):
        self.urls = urls  # 可以是多个镜像源
        self.num_threads = num_threads
        self.output_file = output_file
    def verify_file_integrity(self, file_path, expected_hash=None):
        """验证文件完整性"""
        if not expected_hash:
            return True
        sha256_hash = hashlib.sha256()
        with open(file_path, "rb") as f:
            for chunk in iter(lambda: f.read(4096), b""):
                sha256_hash.update(chunk)
        return sha256_hash.hexdigest() == expected_hash
    def download_from_source(self, url, start, end, output_file):
        """从特定源下载文件块"""
        headers = {'Range': f'bytes={start}-{end}'}
        try:
            response = requests.get(url, headers=headers, timeout=30)
            response.raise_for_status()
            return response.content
        except:
            return None
    def download_with_multiple_sources(self):
        """多源多线程下载"""
        print(f"使用 {len(self.urls)} 个下载源")
        # 获取文件大小
        response = requests.head(self.urls[0])
        file_size = int(response.headers.get('content-length', 0))
        # 计算分块
        chunk_size = file_size // self.num_threads
        chunks = []
        for i in range(self.num_threads):
            start = i * chunk_size
            end = start + chunk_size - 1 if i < self.num_threads - 1 else file_size - 1
            chunks.append((start, end))
        # 使用线程池下载
        with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
            futures = []
            for i, (start, end) in enumerate(chunks):
                # 轮流使用不同的源
                source_url = self.urls[i % len(self.urls)]
                future = executor.submit(
                    self.download_from_source,
                    source_url, start, end, f"chunk_{i}"
                )
                futures.append(future)
            # 收集下载的数据
            data_chunks = []
            for future in as_completed(futures):
                result = future.result()
                if result:
                    data_chunks.append(result)
            # 合并文件
            with open(self.output_file, 'wb') as f:
                for chunk in data_chunks:
                    f.write(chunk)
        print(f"下载完成!文件保存为: {self.output_file}")
# 使用示例
if __name__ == "__main__":
    # 多个下载源(镜像)
    urls = [
        "https://mirror1.example.com/large-file.zip",
        "https://mirror2.example.com/large-file.zip",
        "https://mirror3.example.com/large-file.zip"
    ]
    downloader = AdvancedDownloader(urls, num_threads=6)
    downloader.download_with_multiple_sources()

完整使用示例

# 完整示例:下载测试文件
def test_download():
    """测试下载功能"""
    # 测试用的下载URL(请使用真实URL替换)
    test_url = "https://speed.hetzner.de/100MB.bin"
    print("=" * 50)
    print("Python多线程下载测试")
    print("=" * 50)
    # 测试带进度条的下载
    print("\n1. 测试带进度条的下载器")
    downloader = ProgressDownloader(test_url, num_threads=8)
    file_size = downloader.get_file_size()
    if file_size > 0:
        start_time = time.time()
        downloader.download_with_progress()
        elapsed = time.time() - start_time
        print(f"下载速度: {file_size / 1024 / 1024 / elapsed:.2f} MB/s")
        print(f"耗时: {elapsed:.2f} 秒")
    else:
        print("无法获取文件大小,请检查URL")
if __name__ == "__main__":
    # 安装依赖
    # pip install requests tqdm
    test_download()

使用建议

  1. 选择合适的线程数

    • 4-8 个线程效果最好
    • 过多线程可能导致反爬机制触发
    • 考虑网络带宽限制
  2. 错误处理

    • 添加重试机制
    • 实现断点续传
    • 记录下载日志
  3. 性能优化

    • 使用连接池
    • 调整缓冲区大小
    • 考虑使用异步IO
  4. 注意事项

    • 遵守网站的下载规则
    • 注意版权问题
    • 添加合理的延时避免对服务器造成压力

这些示例提供了从基础到高级的多线程下载实现,您可以根据实际需求选择合适的方案。

抱歉,评论功能暂时关闭!