Python案例中的多进程怎么用?

wen python案例 3

本文目录导读:

Python案例中的多进程怎么用?

  1. 基础用法:创建和启动进程
  2. 进程间通信:队列(Queue)
  3. 进程池(Pool)的使用
  4. 共享内存(Value和Array)
  5. 实战案例:批量下载文件
  6. 进程同步:Event

在Python中,多进程主要通过multiprocessing模块实现,下面我会用多个案例来展示多进程的常用方法。

基础用法:创建和启动进程

import multiprocessing
import time
import os
def worker(name):
    """进程要执行的任务"""
    print(f"进程 {name} 开始工作,PID: {os.getpid()}")
    time.sleep(2)
    print(f"进程 {name} 工作完成")
if __name__ == '__main__':
    # 创建多个进程
    processes = []
    for i in range(3):
        p = multiprocessing.Process(target=worker, args=(f'Worker-{i}',))
        processes.append(p)
        p.start()
    # 等待所有进程完成
    for p in processes:
        p.join()
    print("所有进程执行完毕")

进程间通信:队列(Queue)

import multiprocessing
import time
import random
def producer(queue, items):
    """生产者进程"""
    for i in range(items):
        item = random.randint(1, 100)
        queue.put(item)
        print(f"生产者放入: {item}")
        time.sleep(0.5)
    queue.put(None)  # 发送结束信号
def consumer(queue):
    """消费者进程"""
    while True:
        item = queue.get()
        if item is None:
            break
        print(f"消费者取出: {item}, 处理结果: {item * 2}")
        time.sleep(0.8)
if __name__ == '__main__':
    # 创建队列
    queue = multiprocessing.Queue()
    # 创建生产者和消费者进程
    p1 = multiprocessing.Process(target=producer, args=(queue, 5))
    p2 = multiprocessing.Process(target=consumer, args=(queue,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("生产消费完成")

进程池(Pool)的使用

import multiprocessing
import time
def calculate_square(n):
    """计算平方"""
    time.sleep(1)  # 模拟耗时操作
    print(f"计算 {n} 的平方 = {n * n}")
    return n * n
def calculate_cube(n):
    """计算立方"""
    time.sleep(1)
    print(f"计算 {n} 的立方 = {n ** 3}")
    return n ** 3
if __name__ == '__main__':
    numbers = list(range(1, 11))
    # 创建进程池,默认使用CPU核心数
    with multiprocessing.Pool(processes=4) as pool:
        # 方式1: map - 批量处理
        results = pool.map(calculate_square, numbers)
        print(f"map结果: {results}")
        print("\n" + "="*30 + "\n")
        # 方式2: apply_async - 异步执行
        async_results = []
        for num in numbers[:5]:
            result = pool.apply_async(calculate_cube, (num,))
            async_results.append(result)
        # 获取结果
        for result in async_results:
            print(f"异步结果: {result.get()}")
        print("\n" + "="*30 + "\n")
        # 方式3: starmap - 处理多参数
        params = [(1, 2), (3, 4), (5, 6)]
        def add(a, b):
            time.sleep(0.5)
            return a + b
        results = pool.starmap(add, params)
        print(f"starmap结果: {results}")

共享内存(Value和Array)

import multiprocessing
import time
def increment(counter, lock):
    """递增计数器"""
    for _ in range(100):
        with lock:  # 使用锁避免竞态条件
            counter.value += 1
        time.sleep(0.001)
def decrement(counter, lock):
    """递减计数器"""
    for _ in range(100):
        with lock:
            counter.value -= 1
        time.sleep(0.001)
if __name__ == '__main__':
    # 创建共享变量和锁
    counter = multiprocessing.Value('i', 0)
    lock = multiprocessing.Lock()
    # 创建10个进程同时操作共享变量
    processes = []
    for i in range(5):
        p1 = multiprocessing.Process(target=increment, args=(counter, lock))
        p2 = multiprocessing.Process(target=decrement, args=(counter, lock))
        processes.extend([p1, p2])
        p1.start()
        p2.start()
    for p in processes:
        p.join()
    print(f"最终计数器值: {counter.value}")  # 应该是0

实战案例:批量下载文件

import multiprocessing
import time
import requests
def download_file(url, save_path):
    """下载文件"""
    try:
        print(f"开始下载: {url}")
        response = requests.get(url, timeout=10)
        with open(save_path, 'wb') as f:
            f.write(response.content)
        print(f"下载完成: {save_path}")
        return True
    except Exception as e:
        print(f"下载失败 {url}: {e}")
        return False
def batch_download():
    """批量下载示例"""
    # 模拟下载任务
    download_tasks = [
        ("http://example.com/file1.jpg", "file1.jpg"),
        ("http://example.com/file2.jpg", "file2.jpg"),
        ("http://example.com/file3.jpg", "file3.jpg"),
        ("http://example.com/file4.jpg", "file4.jpg"),
        ("http://example.com/file5.jpg", "file5.jpg"),
    ]
    start_time = time.time()
    # 使用进程池并行下载
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(download_file, download_tasks)
    end_time = time.time()
    print(f"总耗时: {end_time - start_time:.2f}秒")
    print(f"成功下载: {sum(results)}/{len(results)}")
if __name__ == '__main__':
    batch_download()

进程同步:Event

import multiprocessing
import time
import random
def worker_initializer(event):
    """等待信号后开始工作"""
    print("进程准备就绪,等待开始信号...")
    event.wait()  # 等待事件触发
    print("收到开始信号,开始工作")
    time.sleep(random.uniform(0.5, 1.5))
    print("工作完成")
if __name__ == '__main__':
    # 创建事件对象
    start_event = multiprocessing.Event()
    # 创建多个进程
    processes = []
    for i in range(5):
        p = multiprocessing.Process(target=worker_initializer, args=(start_event,))
        processes.append(p)
        p.start()
    print("主进程准备中...")
    time.sleep(2)
    print("主进程发出开始信号!")
    start_event.set()  # 触发事件
    for p in processes:
        p.join()
    print("所有进程完成")
  1. 使用 if __name__ == '__main__':在Windows上必须,避免无限递归创建进程
  2. 进程间通信:使用 QueuePipeManager
  3. 进程池Pool 可以自动管理进程,常用方法有 mapapply_asyncstarmap
  4. 共享数据:使用 ValueArrayManager.dict()
  5. 同步机制LockEventSemaphore 等控制进程同步
  6. 避免全局共享:多进程不共享全局变量,需要使用特殊机制

这些例子覆盖了多进程编程的主要场景,可以根据具体需求选择合适的实现方式。

抱歉,评论功能暂时关闭!