Python案例怎么实现进程通信?

wen python案例 11

本文目录导读:

Python案例怎么实现进程通信?

  1. Queue(队列)实现进程通信
  2. Pipe(管道)实现进程通信
  3. 共享内存(Value/Array)
  4. Manager(管理器)实现复杂数据结构共享
  5. 进程池(Pool)通信示例
  6. 选择指南
  7. 注意事项

Python 实现进程间通信(IPC)有多种方式,这里我为你介绍几种最常用的方法,并附上完整案例。

Queue(队列)实现进程通信

Queue 是进程安全的,适合生产者-消费者模式。

import multiprocessing as mp
import time
import random
def producer(queue, name):
    """生产者进程"""
    for i in range(5):
        item = f"{name}-产品-{i}"
        queue.put(item)
        print(f"[生产者] {name} 生产了: {item}")
        time.sleep(random.uniform(0.5, 1.5))
    # 发送结束信号
    queue.put(None)
def consumer(queue, name):
    """消费者进程"""
    while True:
        item = queue.get()
        if item is None:  # 收到结束信号
            break
        print(f"[消费者] {name} 消费了: {item}")
        time.sleep(random.uniform(0.8, 1.2))
if __name__ == "__main__":
    # 创建队列
    queue = mp.Queue()
    # 创建进程
    p1 = mp.Process(target=producer, args=(queue, "工厂A"))
    p2 = mp.Process(target=producer, args=(queue, "工厂B"))
    c1 = mp.Process(target=consumer, args=(queue, "消费者1"))
    c2 = mp.Process(target=consumer, args=(queue, "消费者2"))
    # 启动进程
    p1.start()
    p2.start()
    c1.start()
    c2.start()
    # 等待结束
    p1.join()
    p2.join()
    c1.join()
    c2.join()
    print("所有进程通信结束")

Pipe(管道)实现进程通信

Pipe 适用于两个进程之间的双向通信。

import multiprocessing as mp
import time
def sender(conn, name):
    """发送者"""
    messages = ["你好", "世界", "Python", "进程", "通信"]
    for msg in messages:
        conn.send(f"{name}: {msg}")
        print(f"[发送者] {name} 发送: {msg}")
        time.sleep(1)
    # 接收回复
    while conn.poll(0.1):  # 检查是否有数据可读
        reply = conn.recv()
        print(f"[发送者] 收到回复: {reply}")
    conn.close()
def receiver(conn, name):
    """接收者"""
    while True:
        try:
            # 接收消息,超时设置为2秒
            message = conn.recv()
            print(f"[接收者] {name} 收到: {message}")
            # 回复确认
            conn.send(f"{name} 已确认收到: {message[:10]}...")
        except EOFError:
            print(f"[接收者] {name} 连接已关闭")
            break
    conn.close()
if __name__ == "__main__":
    # 创建管道,返回(conn1, conn2)
    parent_conn, child_conn = mp.Pipe()
    # 创建进程
    p_sender = mp.Process(target=sender, args=(parent_conn, "进程A"))
    p_receiver = mp.Process(target=receiver, args=(child_conn, "进程B"))
    # 启动进程
    p_sender.start()
    p_receiver.start()
    # 等待结束
    p_sender.join()
    # 给接收者一些时间处理
    time.sleep(0.5)
    p_receiver.terminate()  # 强制结束接收进程
    print("管道通信结束")

共享内存(Value/Array)

适合传递简单数据类型的场景。

import multiprocessing as mp
import time
import random
def worker_counter(counter, lock, process_id):
    """工作进程-计数器"""
    for _ in range(5):
        with lock:  # 使用锁保证原子操作
            counter.value += 1
            print(f"[进程{process_id}] 计数器值: {counter.value}")
        time.sleep(random.uniform(0.5, 1.5))
def worker_writer(shared_array, lock, process_id):
    """工作进程-数组写入"""
    for i in range(3):
        with lock:
            index = i % len(shared_array)
            shared_array[index] = process_id * 100 + i
            print(f"[进程{process_id}] 写入数组[{index}]: {shared_array[index]}")
        time.sleep(random.uniform(0.3, 1.0))
if __name__ == "__main__":
    # 创建共享变量
    counter = mp.Value('i', 0)  # 'i'表示整数类型
    shared_array = mp.Array('i', 5)  # 长度为5的整数数组
    # 创建锁,保证数据一致性
    lock = mp.Lock()
    # 创建计数器进程
    processes = []
    for i in range(3):
        p = mp.Process(target=worker_counter, args=(counter, lock, i))
        processes.append(p)
        p.start()
    # 等待所有计数器进程结束
    for p in processes:
        p.join()
    print(f"\n最终计数器值: {counter.value}")
    # 创建数组写入进程
    processes = []
    for i in range(3):
        p = mp.Process(target=worker_writer, args=(shared_array, lock, i))
        processes.append(p)
        p.start()
    for p in processes:
        p.join()
    print(f"最终数组内容: {list(shared_array)}")

Manager(管理器)实现复杂数据结构共享

Manager 可以共享列表、字典等复杂数据结构。

import multiprocessing as mp
import time
import random
from datetime import datetime
def worker(shared_dict, shared_list, name, lock):
    """工作进程"""
    # 添加数据到共享字典
    with lock:
        shared_dict[name] = {
            'pid': mp.current_process().pid,
            'start_time': datetime.now().strftime('%H:%M:%S'),
            'items': []
        }
    # 模拟工作
    for i in range(3):
        time.sleep(random.uniform(0.5, 1.5))
        item = f"{name}-任务-{i}"
        with lock:
            shared_dict[name]['items'].append(item)
            shared_list.append(item)
            print(f"[{name}] 添加了: {item}")
            # 打印当前状态
            print(f"  当前字典: {shared_dict}")
            print(f"  当前列表最后3项: {shared_list[-3:]}")
if __name__ == "__main__":
    # 使用Manager创建共享对象
    with mp.Manager() as manager:
        shared_dict = manager.dict()  # 共享字典
        shared_list = manager.list()  # 共享列表
        lock = manager.Lock()  # 共享锁
        # 创建工作进程
        processes = []
        names = ['进程A', '进程B', '进程C']
        for name in names:
            p = mp.Process(target=worker, args=(shared_dict, shared_list, name, lock))
            processes.append(p)
            p.start()
        # 等待所有进程结束
        for p in processes:
            p.join()
        print("\n=== 最终结果 ===")
        print(f"共享字典: {dict(shared_dict)}")
        print(f"共享列表: {list(shared_list)}")

进程池(Pool)通信示例

import multiprocessing as mp
import time
def worker_task(data):
    """工作函数"""
    print(f"[进程{mp.current_process().pid}] 处理数据: {data}")
    time.sleep(1)
    return f"结果: {data ** 2}"
def process_result(result):
    """处理结果的回调函数"""
    print(f"回调函数收到: {result}")
if __name__ == "__main__":
    # 创建进程池
    with mp.Pool(processes=4) as pool:
        data = [1, 2, 3, 4, 5, 6, 7, 8]
        # 方法1: map - 同步处理
        print("=== 同步处理 ===")
        results = pool.map(worker_task, data)
        print(f"同步结果: {results}")
        # 方法2: apply_async - 异步处理
        print("\n=== 异步处理 ===")
        async_results = []
        for item in data:
            result = pool.apply_async(worker_task, args=(item,), callback=process_result)
            async_results.append(result)
        # 等待所有异步任务完成
        pool.close()
        pool.join()
        # 获取所有结果
        final_results = [r.get() for r in async_results]
        print(f"\n所有异步结果: {final_results}")

选择指南

通信方式 适用场景 特点
Queue 生产-消费模式 安全、易用、支持多生产多消费
Pipe 双进程通信 高效、支持双向通信
Value/Array 简单数据类型共享 性能最好、需加锁
Manager 复杂数据结构共享 功能强大、性能略低
Pool 任务分发与结果收集 简化并行处理

注意事项

  1. 死锁预防:使用Queue时注意put/get顺序,避免阻塞
  2. 资源管理:使用with语句或显式close/join
  3. 数据一致性:共享内存需要锁保护
  4. 性能考虑:Queue比Manager性能好,Pipe比Queue快

这些示例涵盖了Python进程通信的主要场景,你可以根据实际需求选择合适的方式。

抱歉,评论功能暂时关闭!