本文目录导读:

- 使用Lock(互斥锁)
- 使用RLock(可重入锁)
- 使用Semaphore(信号量)
- 使用Event(事件)
- 使用Condition(条件变量)
- 使用Barrier(屏障)
- 实用示例:线程安全的计数器
- 选择合适的同步机制
在Python中实现线程同步,主要使用threading模块提供的多种同步机制,我来介绍几种常见的方法:
使用Lock(互斥锁)
最基础的同步机制,确保同一时间只有一个线程访问共享资源。
import threading
import time
# 共享资源
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
lock.acquire() # 获取锁
try:
counter += 1
finally:
lock.release() # 释放锁
# 或者使用with语句更简洁
def increment_with():
global counter
for _ in range(100000):
with lock: # 自动获取和释放锁
counter += 1
# 创建多个线程
threads = []
for i in range(5):
t = threading.Thread(target=increment_with)
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print(f"最终计数值: {counter}")
使用RLock(可重入锁)
允许同一个线程多次获得锁,适用于递归调用或嵌套锁的场景。
import threading
class BankAccount:
def __init__(self, balance=0):
self.balance = balance
self.lock = threading.RLock() # 使用RLock
def deposit(self, amount):
with self.lock:
self.balance += amount
print(f"存款 {amount}, 余额: {self.balance}")
def withdraw(self, amount):
with self.lock:
if self.balance >= amount:
self.balance -= amount
print(f"取款 {amount}, 余额: {self.balance}")
else:
print(f"余额不足,当前余额: {self.balance}")
def transfer(self, other_account, amount):
# 需要同时锁定两个账户,使用RLock避免死锁
with self.lock:
self.withdraw(amount)
with other_account.lock:
other_account.deposit(amount)
# 使用示例
account1 = BankAccount(1000)
account2 = BankAccount(500)
t1 = threading.Thread(target=account1.transfer, args=(account2, 300))
t2 = threading.Thread(target=account2.transfer, args=(account1, 200))
t1.start()
t2.start()
t1.join()
t2.join()
使用Semaphore(信号量)
控制同时访问资源的线程数量。
import threading
import time
import random
# 最多允许3个线程同时访问
semaphore = threading.Semaphore(3)
def worker(worker_id):
print(f"工作线程 {worker_id} 正在等待...")
with semaphore:
print(f"工作线程 {worker_id} 获得资源")
time.sleep(random.randint(1, 3)) # 模拟工作
print(f"工作线程 {worker_id} 释放资源")
# 创建10个工作线程
threads = []
for i in range(10):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
使用Event(事件)
用于线程间的通信,一个线程可以等待另一个线程的信号。
import threading
import time
event = threading.Event()
def waiter():
print("等待者: 等待事件触发...")
event.wait() # 等待事件
print("等待者: 事件已触发,继续执行")
def setter():
print("设置者: 5秒后触发事件")
time.sleep(5)
event.set() # 设置事件
print("设置者: 事件已设置")
# 创建线程
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)
t1.start()
t2.start()
t1.join()
t2.join()
使用Condition(条件变量)
更复杂的同步机制,可以实现生产者-消费者模式。
import threading
import time
import random
class ProductQueue:
def __init__(self, max_size=5):
self.queue = []
self.max_size = max_size
self.condition = threading.Condition()
def produce(self, item):
with self.condition:
while len(self.queue) >= self.max_size:
print(f"队列已满,生产者等待...")
self.condition.wait()
self.queue.append(item)
print(f"生产 {item}, 队列: {self.queue}")
self.condition.notify_all() # 通知所有等待的消费者
def consume(self):
with self.condition:
while len(self.queue) == 0:
print(f"队列为空,消费者等待...")
self.condition.wait()
item = self.queue.pop(0)
print(f"消费 {item}, 队列: {self.queue}")
self.condition.notify_all() # 通知所有等待的生产者
return item
# 生产者和消费者函数
def producer(queue):
for i in range(10):
queue.produce(f"产品-{i}")
time.sleep(random.random())
def consumer(queue):
for _ in range(10):
queue.consume()
time.sleep(random.random())
# 主程序
product_queue = ProductQueue(max_size=3)
producer_thread = threading.Thread(target=producer, args=(product_queue,))
consumer_thread = threading.Thread(target=consumer, args=(product_queue,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()
使用Barrier(屏障)
让多个线程在某个点同步,所有线程都到达后才继续执行。
import threading
import time
# 创建一个需要3个线程都到达的屏障
barrier = threading.Barrier(3)
def race(racer_id):
time.sleep(2) # 模拟准备时间
print(f"选手 {racer_id} 已到达起跑线")
barrier.wait() # 等待所有选手
print(f"选手 {racer_id} 开始跑步!")
# 创建3位选手
racers = []
for i in range(3):
t = threading.Thread(target=race, args=(i,))
racers.append(t)
t.start()
for t in racers:
t.join()
实用示例:线程安全的计数器
import threading
import time
class ThreadSafeCounter:
def __init__(self):
self.count = 0
self.lock = threading.Lock()
self.event = threading.Event()
def increment(self):
with self.lock:
self.count += 1
if self.count == 10: # 达到10时触发事件
self.event.set()
return self.count
def wait_for_target(self, target=10):
print(f"等待计数达到 {target}...")
self.event.wait()
print(f"计数已达到 {target}!")
return self.count
def worker(counter, worker_id):
for _ in range(3):
count = counter.increment()
print(f"工作线程 {worker_id}: 当前计数 = {count}")
time.sleep(0.5)
# 使用示例
counter = ThreadSafeCounter()
# 创建等待线程
waiter = threading.Thread(target=counter.wait_for_target, args=(10,))
waiter.start()
# 创建工作线程
workers = []
for i in range(5):
t = threading.Thread(target=worker, args=(counter, i))
workers.append(t)
t.start()
for t in workers:
t.join()
waiter.join()
print(f"最终计数: {counter.count}")
选择合适的同步机制
| 同步机制 | 适用场景 | 特点 |
|---|---|---|
| Lock | 基本互斥访问 | 简单,不可重入 |
| RLock | 递归锁定、嵌套锁定 | 可重入,避免死锁 |
| Semaphore | 控制并发数 | 限流,资源池 |
| Event | 简单线程通信 | 一次性信号 |
| Condition | 复杂状态变化 | 多条件等待/通知 |
| Barrier | 多线程同步点 | 等待所有线程到达 |
建议:
- 优先使用高级抽象(如
with语句)管理锁 - 避免使用全局锁,尽量细粒度锁定
- 防止死锁:保持一致的锁获取顺序,或使用可重入锁
- 考虑使用
concurrent.futures模块,它提供了更高级的并发控制