Python案例如何开启多进程?

wen python案例 11

本文目录导读:

Python案例如何开启多进程?

  1. 使用 multiprocessing.Process(最常用)
  2. 使用进程池 Pool(适合批量任务)
  3. 使用 concurrent.futures.ProcessPoolExecutor(更现代)
  4. 进程间通信(Queue)
  5. 实际案例:批量图片处理
  6. 重要注意事项

在Python中开启多进程主要有三种方式,我会通过实际案例逐一说明。

使用 multiprocessing.Process(最常用)

import multiprocessing
import time
def worker(name, seconds):
    """模拟耗时任务"""
    print(f"进程 {name} 开始工作")
    time.sleep(seconds)
    print(f"进程 {name} 工作完成")
    return f"{name}的结果"
if __name__ == "__main__":  # 重要:Windows系统必须有这个
    # 创建进程
    p1 = multiprocessing.Process(target=worker, args=("进程A", 2))
    p2 = multiprocessing.Process(target=worker, args=("进程B", 3))
    # 启动进程
    p1.start()
    p2.start()
    print("主进程继续执行其他任务...")
    # 等待子进程结束
    p1.join()
    p2.join()
    print("所有进程执行完毕")

使用进程池 Pool(适合批量任务)

from multiprocessing import Pool
import time
def square(n):
    """计算平方数"""
    time.sleep(1)  # 模拟耗时
    return n * n
if __name__ == "__main__":
    # 创建进程池,默认使用CPU核心数
    with Pool(processes=4) as pool:
        # 方式1:map - 同步获取结果
        numbers = [1, 2, 3, 4, 5]
        results = pool.map(square, numbers)
        print(f"同步结果: {results}")
        # 方式2:apply_async - 异步获取结果
        async_results = [pool.apply_async(square, (i,)) for i in range(6, 11)]
        async_output = [res.get() for res in async_results]
        print(f"异步结果: {async_output}")
        # 方式3:imap - 惰性迭代结果
        for result in pool.imap(square, range(11, 16)):
            print(f"惰性结果: {result}")

使用 concurrent.futures.ProcessPoolExecutor(更现代)

from concurrent.futures import ProcessPoolExecutor, as_completed
import time
def download_file(url):
    """模拟下载文件"""
    time.sleep(2)  # 模拟下载时间
    return f"已下载: {url}"
if __name__ == "__main__":
    urls = [
        "http://example.com/file1.txt",
        "http://example.com/file2.txt",
        "http://example.com/file3.txt",
    ]
    # 使用上下文管理器自动管理进程池
    with ProcessPoolExecutor(max_workers=3) as executor:
        # 提交所有任务
        future_to_url = {executor.submit(download_file, url): url for url in urls}
        # 处理完成的结果
        for future in as_completed(future_to_url):
            url = future_to_url[future]
            try:
                result = future.result()
                print(f"成功: {result}")
            except Exception as e:
                print(f"失败: {url} 错误: {e}")

进程间通信(Queue)

from multiprocessing import Process, Queue
import time
def producer(queue):
    """生产者进程"""
    for i in range(5):
        item = f"数据-{i}"
        queue.put(item)
        print(f"生产: {item}")
        time.sleep(0.5)
    queue.put(None)  # 发送结束信号
def consumer(queue):
    """消费者进程"""
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"消费: {item}")
        time.sleep(1)
if __name__ == "__main__":
    queue = Queue()
    p1 = Process(target=producer, args=(queue,))
    p2 = Process(target=consumer, args=(queue,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("生产消费完成")

实际案例:批量图片处理

from multiprocessing import Pool
import time
import os
def process_image(filename):
    """模拟图片处理"""
    print(f"处理图片: {filename}")
    time.sleep(1)  # 模拟耗时处理
    # 假设的处理结果
    output_filename = f"processed_{filename}"
    return output_filename
def batch_process_images(file_list, num_processes=None):
    """批量处理图片"""
    if num_processes is None:
        num_processes = os.cpu_count()  # 使用CPU核心数
    print(f"使用 {num_processes} 个进程处理 {len(file_list)} 张图片")
    with Pool(processes=num_processes) as pool:
        # 使用map获取结果
        results = pool.map(process_image, file_list)
    print(f"处理完成,共处理 {len(results)} 张图片")
    return results
if __name__ == "__main__":
    # 模拟100张图片
    images = [f"image_{i:03d}.jpg" for i in range(100)]
    start_time = time.time()
    processed_images = batch_process_images(images, num_processes=4)
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f} 秒")

重要注意事项

  1. __name__ == "__main__":Windows系统必须使用这个保护,否则会递归创建进程
  2. 资源管理:使用 with 语句(上下文管理器)自动管理进程池
  3. 进程间通信:使用 QueuePipe 等,不要共享全局变量
  4. GIL限制:多进程适合CPU密集型任务,I/O密集型可用多线程
  5. 内存消耗:每个进程有独立的内存空间,注意不要消耗过多内存

选择哪种方式取决于你的具体需求:

  • 少量进程:用 Process
  • 批量任务:用 PoolProcessPoolExecutor
  • 需要进程通信:用 QueuePipe
  • 更现代API:推荐 concurrent.futures 模块

抱歉,评论功能暂时关闭!