本文目录导读:

- 使用
multiprocessing.Process(最常用) - 使用进程池
Pool(适合批量任务) - 使用
concurrent.futures.ProcessPoolExecutor(更现代) - 进程间通信(Queue)
- 实际案例:批量图片处理
- 重要注意事项
在Python中开启多进程主要有三种方式,我会通过实际案例逐一说明。
使用 multiprocessing.Process(最常用)
import multiprocessing
import time
def worker(name, seconds):
"""模拟耗时任务"""
print(f"进程 {name} 开始工作")
time.sleep(seconds)
print(f"进程 {name} 工作完成")
return f"{name}的结果"
if __name__ == "__main__": # 重要:Windows系统必须有这个
# 创建进程
p1 = multiprocessing.Process(target=worker, args=("进程A", 2))
p2 = multiprocessing.Process(target=worker, args=("进程B", 3))
# 启动进程
p1.start()
p2.start()
print("主进程继续执行其他任务...")
# 等待子进程结束
p1.join()
p2.join()
print("所有进程执行完毕")
使用进程池 Pool(适合批量任务)
from multiprocessing import Pool
import time
def square(n):
"""计算平方数"""
time.sleep(1) # 模拟耗时
return n * n
if __name__ == "__main__":
# 创建进程池,默认使用CPU核心数
with Pool(processes=4) as pool:
# 方式1:map - 同步获取结果
numbers = [1, 2, 3, 4, 5]
results = pool.map(square, numbers)
print(f"同步结果: {results}")
# 方式2:apply_async - 异步获取结果
async_results = [pool.apply_async(square, (i,)) for i in range(6, 11)]
async_output = [res.get() for res in async_results]
print(f"异步结果: {async_output}")
# 方式3:imap - 惰性迭代结果
for result in pool.imap(square, range(11, 16)):
print(f"惰性结果: {result}")
使用 concurrent.futures.ProcessPoolExecutor(更现代)
from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def download_file(url):
"""模拟下载文件"""
time.sleep(2) # 模拟下载时间
return f"已下载: {url}"
if __name__ == "__main__":
urls = [
"http://example.com/file1.txt",
"http://example.com/file2.txt",
"http://example.com/file3.txt",
]
# 使用上下文管理器自动管理进程池
with ProcessPoolExecutor(max_workers=3) as executor:
# 提交所有任务
future_to_url = {executor.submit(download_file, url): url for url in urls}
# 处理完成的结果
for future in as_completed(future_to_url):
url = future_to_url[future]
try:
result = future.result()
print(f"成功: {result}")
except Exception as e:
print(f"失败: {url} 错误: {e}")
进程间通信(Queue)
from multiprocessing import Process, Queue
import time
def producer(queue):
"""生产者进程"""
for i in range(5):
item = f"数据-{i}"
queue.put(item)
print(f"生产: {item}")
time.sleep(0.5)
queue.put(None) # 发送结束信号
def consumer(queue):
"""消费者进程"""
while True:
item = queue.get()
if item is None:
break
print(f"消费: {item}")
time.sleep(1)
if __name__ == "__main__":
queue = Queue()
p1 = Process(target=producer, args=(queue,))
p2 = Process(target=consumer, args=(queue,))
p1.start()
p2.start()
p1.join()
p2.join()
print("生产消费完成")
实际案例:批量图片处理
from multiprocessing import Pool
import time
import os
def process_image(filename):
"""模拟图片处理"""
print(f"处理图片: {filename}")
time.sleep(1) # 模拟耗时处理
# 假设的处理结果
output_filename = f"processed_{filename}"
return output_filename
def batch_process_images(file_list, num_processes=None):
"""批量处理图片"""
if num_processes is None:
num_processes = os.cpu_count() # 使用CPU核心数
print(f"使用 {num_processes} 个进程处理 {len(file_list)} 张图片")
with Pool(processes=num_processes) as pool:
# 使用map获取结果
results = pool.map(process_image, file_list)
print(f"处理完成,共处理 {len(results)} 张图片")
return results
if __name__ == "__main__":
# 模拟100张图片
images = [f"image_{i:03d}.jpg" for i in range(100)]
start_time = time.time()
processed_images = batch_process_images(images, num_processes=4)
end_time = time.time()
print(f"总耗时: {end_time - start_time:.2f} 秒")
重要注意事项
__name__ == "__main__":Windows系统必须使用这个保护,否则会递归创建进程- 资源管理:使用
with语句(上下文管理器)自动管理进程池 - 进程间通信:使用
Queue、Pipe等,不要共享全局变量 - GIL限制:多进程适合CPU密集型任务,I/O密集型可用多线程
- 内存消耗:每个进程有独立的内存空间,注意不要消耗过多内存
选择哪种方式取决于你的具体需求:
- 少量进程:用
Process类 - 批量任务:用
Pool或ProcessPoolExecutor - 需要进程通信:用
Queue或Pipe - 更现代API:推荐
concurrent.futures模块