本文目录导读:

Python 实现进程间通信(IPC)有多种方式,这里我为你介绍几种最常用的方法,并附上完整案例。
Queue(队列)实现进程通信
Queue 是进程安全的,适合生产者-消费者模式。
import multiprocessing as mp
import time
import random
def producer(queue, name):
"""生产者进程"""
for i in range(5):
item = f"{name}-产品-{i}"
queue.put(item)
print(f"[生产者] {name} 生产了: {item}")
time.sleep(random.uniform(0.5, 1.5))
# 发送结束信号
queue.put(None)
def consumer(queue, name):
"""消费者进程"""
while True:
item = queue.get()
if item is None: # 收到结束信号
break
print(f"[消费者] {name} 消费了: {item}")
time.sleep(random.uniform(0.8, 1.2))
if __name__ == "__main__":
# 创建队列
queue = mp.Queue()
# 创建进程
p1 = mp.Process(target=producer, args=(queue, "工厂A"))
p2 = mp.Process(target=producer, args=(queue, "工厂B"))
c1 = mp.Process(target=consumer, args=(queue, "消费者1"))
c2 = mp.Process(target=consumer, args=(queue, "消费者2"))
# 启动进程
p1.start()
p2.start()
c1.start()
c2.start()
# 等待结束
p1.join()
p2.join()
c1.join()
c2.join()
print("所有进程通信结束")
Pipe(管道)实现进程通信
Pipe 适用于两个进程之间的双向通信。
import multiprocessing as mp
import time
def sender(conn, name):
"""发送者"""
messages = ["你好", "世界", "Python", "进程", "通信"]
for msg in messages:
conn.send(f"{name}: {msg}")
print(f"[发送者] {name} 发送: {msg}")
time.sleep(1)
# 接收回复
while conn.poll(0.1): # 检查是否有数据可读
reply = conn.recv()
print(f"[发送者] 收到回复: {reply}")
conn.close()
def receiver(conn, name):
"""接收者"""
while True:
try:
# 接收消息,超时设置为2秒
message = conn.recv()
print(f"[接收者] {name} 收到: {message}")
# 回复确认
conn.send(f"{name} 已确认收到: {message[:10]}...")
except EOFError:
print(f"[接收者] {name} 连接已关闭")
break
conn.close()
if __name__ == "__main__":
# 创建管道,返回(conn1, conn2)
parent_conn, child_conn = mp.Pipe()
# 创建进程
p_sender = mp.Process(target=sender, args=(parent_conn, "进程A"))
p_receiver = mp.Process(target=receiver, args=(child_conn, "进程B"))
# 启动进程
p_sender.start()
p_receiver.start()
# 等待结束
p_sender.join()
# 给接收者一些时间处理
time.sleep(0.5)
p_receiver.terminate() # 强制结束接收进程
print("管道通信结束")
共享内存(Value/Array)
适合传递简单数据类型的场景。
import multiprocessing as mp
import time
import random
def worker_counter(counter, lock, process_id):
"""工作进程-计数器"""
for _ in range(5):
with lock: # 使用锁保证原子操作
counter.value += 1
print(f"[进程{process_id}] 计数器值: {counter.value}")
time.sleep(random.uniform(0.5, 1.5))
def worker_writer(shared_array, lock, process_id):
"""工作进程-数组写入"""
for i in range(3):
with lock:
index = i % len(shared_array)
shared_array[index] = process_id * 100 + i
print(f"[进程{process_id}] 写入数组[{index}]: {shared_array[index]}")
time.sleep(random.uniform(0.3, 1.0))
if __name__ == "__main__":
# 创建共享变量
counter = mp.Value('i', 0) # 'i'表示整数类型
shared_array = mp.Array('i', 5) # 长度为5的整数数组
# 创建锁,保证数据一致性
lock = mp.Lock()
# 创建计数器进程
processes = []
for i in range(3):
p = mp.Process(target=worker_counter, args=(counter, lock, i))
processes.append(p)
p.start()
# 等待所有计数器进程结束
for p in processes:
p.join()
print(f"\n最终计数器值: {counter.value}")
# 创建数组写入进程
processes = []
for i in range(3):
p = mp.Process(target=worker_writer, args=(shared_array, lock, i))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"最终数组内容: {list(shared_array)}")
Manager(管理器)实现复杂数据结构共享
Manager 可以共享列表、字典等复杂数据结构。
import multiprocessing as mp
import time
import random
from datetime import datetime
def worker(shared_dict, shared_list, name, lock):
"""工作进程"""
# 添加数据到共享字典
with lock:
shared_dict[name] = {
'pid': mp.current_process().pid,
'start_time': datetime.now().strftime('%H:%M:%S'),
'items': []
}
# 模拟工作
for i in range(3):
time.sleep(random.uniform(0.5, 1.5))
item = f"{name}-任务-{i}"
with lock:
shared_dict[name]['items'].append(item)
shared_list.append(item)
print(f"[{name}] 添加了: {item}")
# 打印当前状态
print(f" 当前字典: {shared_dict}")
print(f" 当前列表最后3项: {shared_list[-3:]}")
if __name__ == "__main__":
# 使用Manager创建共享对象
with mp.Manager() as manager:
shared_dict = manager.dict() # 共享字典
shared_list = manager.list() # 共享列表
lock = manager.Lock() # 共享锁
# 创建工作进程
processes = []
names = ['进程A', '进程B', '进程C']
for name in names:
p = mp.Process(target=worker, args=(shared_dict, shared_list, name, lock))
processes.append(p)
p.start()
# 等待所有进程结束
for p in processes:
p.join()
print("\n=== 最终结果 ===")
print(f"共享字典: {dict(shared_dict)}")
print(f"共享列表: {list(shared_list)}")
进程池(Pool)通信示例
import multiprocessing as mp
import time
def worker_task(data):
"""工作函数"""
print(f"[进程{mp.current_process().pid}] 处理数据: {data}")
time.sleep(1)
return f"结果: {data ** 2}"
def process_result(result):
"""处理结果的回调函数"""
print(f"回调函数收到: {result}")
if __name__ == "__main__":
# 创建进程池
with mp.Pool(processes=4) as pool:
data = [1, 2, 3, 4, 5, 6, 7, 8]
# 方法1: map - 同步处理
print("=== 同步处理 ===")
results = pool.map(worker_task, data)
print(f"同步结果: {results}")
# 方法2: apply_async - 异步处理
print("\n=== 异步处理 ===")
async_results = []
for item in data:
result = pool.apply_async(worker_task, args=(item,), callback=process_result)
async_results.append(result)
# 等待所有异步任务完成
pool.close()
pool.join()
# 获取所有结果
final_results = [r.get() for r in async_results]
print(f"\n所有异步结果: {final_results}")
选择指南
| 通信方式 | 适用场景 | 特点 |
|---|---|---|
| Queue | 生产-消费模式 | 安全、易用、支持多生产多消费 |
| Pipe | 双进程通信 | 高效、支持双向通信 |
| Value/Array | 简单数据类型共享 | 性能最好、需加锁 |
| Manager | 复杂数据结构共享 | 功能强大、性能略低 |
| Pool | 任务分发与结果收集 | 简化并行处理 |
注意事项
- 死锁预防:使用Queue时注意put/get顺序,避免阻塞
- 资源管理:使用with语句或显式close/join
- 数据一致性:共享内存需要锁保护
- 性能考虑:Queue比Manager性能好,Pipe比Queue快
这些示例涵盖了Python进程通信的主要场景,你可以根据实际需求选择合适的方式。