Python案例怎么使用线程池?

wen python案例 8

本文目录导读:

Python案例怎么使用线程池?

  1. 基础线程池使用(ThreadPoolExecutor基础)
  2. 批量处理数据的案例
  3. map()函数的高效用法
  4. 实际应用:文件处理案例
  5. 带进度条的多线程处理
  6. 线程池的异常处理
  7. 实际应用:Web爬虫案例
  8. 线程池性能对比
  9. 使用建议

我来详细讲解Python线程池的使用方法,包含多个实用案例。

基础线程池使用(ThreadPoolExecutor基础)

基本示例

from concurrent.futures import ThreadPoolExecutor
import time
import random
def task(n):
    """模拟耗时任务"""
    print(f"任务 {n} 开始执行")
    time.sleep(random.randint(1, 3))  # 模拟耗时操作
    print(f"任务 {n} 完成")
    return n * 2
# 创建线程池(最大线程数=3)
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交单个任务
    future = executor.submit(task, 1)
    print(f"任务返回结果: {future.result()}")
    # 批量提交任务
    futures = [executor.submit(task, i) for i in range(2, 6)]
    # 获取所有结果
    results = [f.result() for f in futures]
    print(f"所有结果: {results}")

批量处理数据的案例

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
def download_image(url):
    """下载图片并返回大小"""
    try:
        response = requests.get(url, timeout=10)
        return len(response.content)
    except Exception as e:
        return f"下载失败: {e}"
# 图片URL列表
image_urls = [
    "https://example.com/image1.jpg",
    "https://example.com/image2.jpg", 
    "https://example.com/image3.jpg"
]
def batch_download():
    """批量下载图片"""
    with ThreadPoolExecutor(max_workers=5) as executor:
        # 提交所有下载任务
        future_to_url = {
            executor.submit(download_image, url): url 
            for url in image_urls
        }
        # 处理完成的任务
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                size = future.result()
                print(f"{url} 下载完成,大小: {size} 字节")
            except Exception as e:
                print(f"{url} 下载失败: {e}")
# 运行批量下载
batch_download()

map()函数的高效用法

from concurrent.futures import ThreadPoolExecutor
import time
def process_data(data):
    """处理单个数据项"""
    time.sleep(1)
    return f"处理后的: {data}"
def process_batch():
    """批量处理数据"""
    data_list = list(range(1, 11))  # 1到10
    # 使用map()简化批量处理
    with ThreadPoolExecutor(max_workers=5) as executor:
        # map会按顺序返回结果
        results = list(executor.map(process_data, data_list))
    for original, processed in zip(data_list, results):
        print(f"{original} -> {processed}")
# 运行批量处理
process_batch()

实际应用:文件处理案例

from concurrent.futures import ThreadPoolExecutor, as_completed
import os
import hashlib
import time
def calculate_file_hash(file_path):
    """计算文件哈希值"""
    try:
        with open(file_path, 'rb') as f:
            file_hash = hashlib.md5()
            # 分块读取大文件
            while chunk := f.read(8192):
                file_hash.update(chunk)
        return (file_path, file_hash.hexdigest())
    except Exception as e:
        return (file_path, f"错误: {e}")
def process_files_parallel(file_list):
    """并行处理多个文件"""
    print(f"开始处理 {len(file_list)} 个文件...")
    with ThreadPoolExecutor(max_workers=4) as executor:
        # 提交所有文件处理任务
        futures = {
            executor.submit(calculate_file_hash, file_path): file_path 
            for file_path in file_list
        }
        # 收集结果
        results = {}
        for future in as_completed(futures):
            file_path, file_hash = future.result()
            results[file_path] = file_hash
            print(f"已完成: {os.path.basename(file_path)} -> {file_hash[:8]}...")
    return results
# 使用示例
if __name__ == "__main__":
    # 模拟文件列表
    test_files = [
        "file1.txt",
        "file2.txt", 
        "file3.txt"
    ]
    results = process_files_parallel(test_files)

带进度条的多线程处理

from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time
import random
def worker_with_progress(worker_id, progress_bar):
    """带进度条的工作函数"""
    time.sleep(random.uniform(0.5, 2))
    progress_bar.update(1)
    return f"Worker {worker_id} completed"
def parallel_with_progress():
    """带进度条的并行处理"""
    total_tasks = 20
    results = []
    with tqdm(total=total_tasks, desc="处理进度") as pbar:
        with ThreadPoolExecutor(max_workers=5) as executor:
            # 创建所有任务
            futures = [
                executor.submit(worker_with_progress, i, pbar) 
                for i in range(total_tasks)
            ]
            # 收集结果
            for future in as_completed(futures):
                results.append(future.result())
    print(f"所有任务完成: {len(results)} 个")
# 运行带进度条的处理
parallel_with_progress()

线程池的异常处理

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def risky_task(n):
    """可能引发异常的任务"""
    if n == 3:
        raise ValueError(f"任务 {n} 出错了!")
    time.sleep(1)
    return n ** 2
def safe_execute():
    """安全执行线程池任务"""
    results = []
    errors = []
    with ThreadPoolExecutor(max_workers=3) as executor:
        futures = {executor.submit(risky_task, i): i for i in range(5)}
        for future in as_completed(futures):
            task_id = futures[future]
            try:
                result = future.result()
                results.append((task_id, result))
                print(f"任务 {task_id} 成功: {result}")
            except Exception as e:
                errors.append((task_id, str(e)))
                print(f"任务 {task_id} 失败: {e}")
    print(f"\n成功: {len(results)} 个")
    print(f"失败: {len(errors)} 个")
    return results, errors
# 运行带异常处理的版本
safe_execute()

实际应用:Web爬虫案例

from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from bs4 import BeautifulSoup
import time
class WebScraper:
    def __init__(self, max_workers=5):
        self.executor = ThreadPoolExecutor(max_workers=max_workers)
        self.session = requests.Session()
    def fetch_page(self, url):
        """获取单个页面"""
        try:
            response = self.session.get(url, timeout=10)
            response.raise_for_status()
            return (url, response.text)
        except Exception as e:
            return (url, f"错误: {e}")
    def process_page(self, url, html):
        """处理页面内容"""
        if isinstance(html, str) and html.startswith("错误"):
            return (url, html)
        soup = BeautifulSoup(html, 'html.parser')
        title = soup.title.string if soup.title else "无标题"
        return (url, title)
    def scrape_pages(self, urls):
        """并行抓取多个页面"""
        results = []
        # 第一阶段:并行获取页面
        print("开始抓取页面...")
        fetch_futures = {
            self.executor.submit(self.fetch_page, url): url 
            for url in urls
        }
        pages = {}
        for future in as_completed(fetch_futures):
            url, content = future.result()
            pages[url] = content
            print(f"获取完成: {url}")
        # 第二阶段:并行处理页面
        print("\n开始处理页面...")
        process_futures = {
            self.executor.submit(self.process_page, url, content): url 
            for url, content in pages.items()
        }
        for future in as_completed(process_futures):
            url, result = future.result()
            results.append((url, result))
            print(f"处理完成: {url} -> {result}")
        return results
    def __del__(self):
        self.executor.shutdown(wait=False)
# 使用示例
if __name__ == "__main__":
    scraper = WebScraper(max_workers=3)
    test_urls = [
        "https://example.com",
        "https://httpbin.org/html",
        "https://google.com"
    ]
    results = scraper.scrape_pages(test_urls)

线程池性能对比

from concurrent.futures import ThreadPoolExecutor
import time
import random
def measure_performance():
    """对比串行和并行执行性能"""
    def work(n):
        """模拟工作负载"""
        total = 0
        for i in range(1000000):
            total += random.random()
        return total
    # 串行执行
    start_time = time.time()
    results_sequential = [work(i) for i in range(10)]
    sequential_time = time.time() - start_time
    # 并行执行
    start_time = time.time()
    with ThreadPoolExecutor(max_workers=5) as executor:
        results_parallel = list(executor.map(work, range(10)))
    parallel_time = time.time() - start_time
    print(f"串行执行时间: {sequential_time:.2f} 秒")
    print(f"并行执行时间: {parallel_time:.2f} 秒")
    print(f"性能提升: {sequential_time/parallel_time:.1f}x")
# 运行性能测试
measure_performance()

使用建议

  1. 选择合适的线程数

    • I/O密集型任务:线程数可以较多(通常2-4倍CPU核心数)
    • CPU密集型任务:不建议用线程池,考虑多进程
  2. 异常处理

    • 始终使用try-except捕获future.result()的异常
    • 提异常不会自动中断其他任务
  3. 资源管理

    • 使用with语句自动管理线程池生命周期
    • 避免创建过多线程导致资源耗尽
  4. 常用的线程池配置

    # I/O密集型任务
    ThreadPoolExecutor(max_workers=20)
    # 混合型任务
    ThreadPoolExecutor(max_workers=10)
    # 轻量级任务
    ThreadPoolExecutor(max_workers=4)

这些案例涵盖了线程池的大部分使用场景,你可以根据实际需求选择合适的实现方式。

抱歉,评论功能暂时关闭!