本文目录导读:

- 基础线程池使用(ThreadPoolExecutor基础)
- 批量处理数据的案例
- map()函数的高效用法
- 实际应用:文件处理案例
- 带进度条的多线程处理
- 线程池的异常处理
- 实际应用:Web爬虫案例
- 线程池性能对比
- 使用建议
我来详细讲解Python线程池的使用方法,包含多个实用案例。
基础线程池使用(ThreadPoolExecutor基础)
基本示例
from concurrent.futures import ThreadPoolExecutor
import time
import random
def task(n):
"""模拟耗时任务"""
print(f"任务 {n} 开始执行")
time.sleep(random.randint(1, 3)) # 模拟耗时操作
print(f"任务 {n} 完成")
return n * 2
# 创建线程池(最大线程数=3)
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交单个任务
future = executor.submit(task, 1)
print(f"任务返回结果: {future.result()}")
# 批量提交任务
futures = [executor.submit(task, i) for i in range(2, 6)]
# 获取所有结果
results = [f.result() for f in futures]
print(f"所有结果: {results}")
批量处理数据的案例
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
import time
def download_image(url):
"""下载图片并返回大小"""
try:
response = requests.get(url, timeout=10)
return len(response.content)
except Exception as e:
return f"下载失败: {e}"
# 图片URL列表
image_urls = [
"https://example.com/image1.jpg",
"https://example.com/image2.jpg",
"https://example.com/image3.jpg"
]
def batch_download():
"""批量下载图片"""
with ThreadPoolExecutor(max_workers=5) as executor:
# 提交所有下载任务
future_to_url = {
executor.submit(download_image, url): url
for url in image_urls
}
# 处理完成的任务
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
size = future.result()
print(f"{url} 下载完成,大小: {size} 字节")
except Exception as e:
print(f"{url} 下载失败: {e}")
# 运行批量下载
batch_download()
map()函数的高效用法
from concurrent.futures import ThreadPoolExecutor
import time
def process_data(data):
"""处理单个数据项"""
time.sleep(1)
return f"处理后的: {data}"
def process_batch():
"""批量处理数据"""
data_list = list(range(1, 11)) # 1到10
# 使用map()简化批量处理
with ThreadPoolExecutor(max_workers=5) as executor:
# map会按顺序返回结果
results = list(executor.map(process_data, data_list))
for original, processed in zip(data_list, results):
print(f"{original} -> {processed}")
# 运行批量处理
process_batch()
实际应用:文件处理案例
from concurrent.futures import ThreadPoolExecutor, as_completed
import os
import hashlib
import time
def calculate_file_hash(file_path):
"""计算文件哈希值"""
try:
with open(file_path, 'rb') as f:
file_hash = hashlib.md5()
# 分块读取大文件
while chunk := f.read(8192):
file_hash.update(chunk)
return (file_path, file_hash.hexdigest())
except Exception as e:
return (file_path, f"错误: {e}")
def process_files_parallel(file_list):
"""并行处理多个文件"""
print(f"开始处理 {len(file_list)} 个文件...")
with ThreadPoolExecutor(max_workers=4) as executor:
# 提交所有文件处理任务
futures = {
executor.submit(calculate_file_hash, file_path): file_path
for file_path in file_list
}
# 收集结果
results = {}
for future in as_completed(futures):
file_path, file_hash = future.result()
results[file_path] = file_hash
print(f"已完成: {os.path.basename(file_path)} -> {file_hash[:8]}...")
return results
# 使用示例
if __name__ == "__main__":
# 模拟文件列表
test_files = [
"file1.txt",
"file2.txt",
"file3.txt"
]
results = process_files_parallel(test_files)
带进度条的多线程处理
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm
import time
import random
def worker_with_progress(worker_id, progress_bar):
"""带进度条的工作函数"""
time.sleep(random.uniform(0.5, 2))
progress_bar.update(1)
return f"Worker {worker_id} completed"
def parallel_with_progress():
"""带进度条的并行处理"""
total_tasks = 20
results = []
with tqdm(total=total_tasks, desc="处理进度") as pbar:
with ThreadPoolExecutor(max_workers=5) as executor:
# 创建所有任务
futures = [
executor.submit(worker_with_progress, i, pbar)
for i in range(total_tasks)
]
# 收集结果
for future in as_completed(futures):
results.append(future.result())
print(f"所有任务完成: {len(results)} 个")
# 运行带进度条的处理
parallel_with_progress()
线程池的异常处理
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
def risky_task(n):
"""可能引发异常的任务"""
if n == 3:
raise ValueError(f"任务 {n} 出错了!")
time.sleep(1)
return n ** 2
def safe_execute():
"""安全执行线程池任务"""
results = []
errors = []
with ThreadPoolExecutor(max_workers=3) as executor:
futures = {executor.submit(risky_task, i): i for i in range(5)}
for future in as_completed(futures):
task_id = futures[future]
try:
result = future.result()
results.append((task_id, result))
print(f"任务 {task_id} 成功: {result}")
except Exception as e:
errors.append((task_id, str(e)))
print(f"任务 {task_id} 失败: {e}")
print(f"\n成功: {len(results)} 个")
print(f"失败: {len(errors)} 个")
return results, errors
# 运行带异常处理的版本
safe_execute()
实际应用:Web爬虫案例
from concurrent.futures import ThreadPoolExecutor, as_completed
import requests
from bs4 import BeautifulSoup
import time
class WebScraper:
def __init__(self, max_workers=5):
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self.session = requests.Session()
def fetch_page(self, url):
"""获取单个页面"""
try:
response = self.session.get(url, timeout=10)
response.raise_for_status()
return (url, response.text)
except Exception as e:
return (url, f"错误: {e}")
def process_page(self, url, html):
"""处理页面内容"""
if isinstance(html, str) and html.startswith("错误"):
return (url, html)
soup = BeautifulSoup(html, 'html.parser')
title = soup.title.string if soup.title else "无标题"
return (url, title)
def scrape_pages(self, urls):
"""并行抓取多个页面"""
results = []
# 第一阶段:并行获取页面
print("开始抓取页面...")
fetch_futures = {
self.executor.submit(self.fetch_page, url): url
for url in urls
}
pages = {}
for future in as_completed(fetch_futures):
url, content = future.result()
pages[url] = content
print(f"获取完成: {url}")
# 第二阶段:并行处理页面
print("\n开始处理页面...")
process_futures = {
self.executor.submit(self.process_page, url, content): url
for url, content in pages.items()
}
for future in as_completed(process_futures):
url, result = future.result()
results.append((url, result))
print(f"处理完成: {url} -> {result}")
return results
def __del__(self):
self.executor.shutdown(wait=False)
# 使用示例
if __name__ == "__main__":
scraper = WebScraper(max_workers=3)
test_urls = [
"https://example.com",
"https://httpbin.org/html",
"https://google.com"
]
results = scraper.scrape_pages(test_urls)
线程池性能对比
from concurrent.futures import ThreadPoolExecutor
import time
import random
def measure_performance():
"""对比串行和并行执行性能"""
def work(n):
"""模拟工作负载"""
total = 0
for i in range(1000000):
total += random.random()
return total
# 串行执行
start_time = time.time()
results_sequential = [work(i) for i in range(10)]
sequential_time = time.time() - start_time
# 并行执行
start_time = time.time()
with ThreadPoolExecutor(max_workers=5) as executor:
results_parallel = list(executor.map(work, range(10)))
parallel_time = time.time() - start_time
print(f"串行执行时间: {sequential_time:.2f} 秒")
print(f"并行执行时间: {parallel_time:.2f} 秒")
print(f"性能提升: {sequential_time/parallel_time:.1f}x")
# 运行性能测试
measure_performance()
使用建议
-
选择合适的线程数:
- I/O密集型任务:线程数可以较多(通常2-4倍CPU核心数)
- CPU密集型任务:不建议用线程池,考虑多进程
-
异常处理:
- 始终使用try-except捕获future.result()的异常
- 提异常不会自动中断其他任务
-
资源管理:
- 使用with语句自动管理线程池生命周期
- 避免创建过多线程导致资源耗尽
-
常用的线程池配置:
# I/O密集型任务 ThreadPoolExecutor(max_workers=20) # 混合型任务 ThreadPoolExecutor(max_workers=10) # 轻量级任务 ThreadPoolExecutor(max_workers=4)
这些案例涵盖了线程池的大部分使用场景,你可以根据实际需求选择合适的实现方式。