Python案例怎么实现线程同步?

wen python案例 10

本文目录导读:

Python案例怎么实现线程同步?

  1. 使用Lock(互斥锁)
  2. 使用RLock(可重入锁)
  3. 使用Semaphore(信号量)
  4. 使用Event(事件)
  5. 使用Condition(条件变量)
  6. 使用Barrier(屏障)
  7. 实用示例:线程安全的计数器
  8. 选择合适的同步机制

在Python中实现线程同步,主要使用threading模块提供的多种同步机制,我来介绍几种常见的方法:

使用Lock(互斥锁)

最基础的同步机制,确保同一时间只有一个线程访问共享资源。

import threading
import time
# 共享资源
counter = 0
lock = threading.Lock()
def increment():
    global counter
    for _ in range(100000):
        lock.acquire()  # 获取锁
        try:
            counter += 1
        finally:
            lock.release()  # 释放锁
# 或者使用with语句更简洁
def increment_with():
    global counter
    for _ in range(100000):
        with lock:  # 自动获取和释放锁
            counter += 1
# 创建多个线程
threads = []
for i in range(5):
    t = threading.Thread(target=increment_with)
    threads.append(t)
    t.start()
# 等待所有线程完成
for t in threads:
    t.join()
print(f"最终计数值: {counter}")

使用RLock(可重入锁)

允许同一个线程多次获得锁,适用于递归调用或嵌套锁的场景。

import threading
class BankAccount:
    def __init__(self, balance=0):
        self.balance = balance
        self.lock = threading.RLock()  # 使用RLock
    def deposit(self, amount):
        with self.lock:
            self.balance += amount
            print(f"存款 {amount}, 余额: {self.balance}")
    def withdraw(self, amount):
        with self.lock:
            if self.balance >= amount:
                self.balance -= amount
                print(f"取款 {amount}, 余额: {self.balance}")
            else:
                print(f"余额不足,当前余额: {self.balance}")
    def transfer(self, other_account, amount):
        # 需要同时锁定两个账户,使用RLock避免死锁
        with self.lock:
            self.withdraw(amount)
            with other_account.lock:
                other_account.deposit(amount)
# 使用示例
account1 = BankAccount(1000)
account2 = BankAccount(500)
t1 = threading.Thread(target=account1.transfer, args=(account2, 300))
t2 = threading.Thread(target=account2.transfer, args=(account1, 200))
t1.start()
t2.start()
t1.join()
t2.join()

使用Semaphore(信号量)

控制同时访问资源的线程数量。

import threading
import time
import random
# 最多允许3个线程同时访问
semaphore = threading.Semaphore(3)
def worker(worker_id):
    print(f"工作线程 {worker_id} 正在等待...")
    with semaphore:
        print(f"工作线程 {worker_id} 获得资源")
        time.sleep(random.randint(1, 3))  # 模拟工作
        print(f"工作线程 {worker_id} 释放资源")
# 创建10个工作线程
threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()

使用Event(事件)

用于线程间的通信,一个线程可以等待另一个线程的信号。

import threading
import time
event = threading.Event()
def waiter():
    print("等待者: 等待事件触发...")
    event.wait()  # 等待事件
    print("等待者: 事件已触发,继续执行")
def setter():
    print("设置者: 5秒后触发事件")
    time.sleep(5)
    event.set()  # 设置事件
    print("设置者: 事件已设置")
# 创建线程
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)
t1.start()
t2.start()
t1.join()
t2.join()

使用Condition(条件变量)

更复杂的同步机制,可以实现生产者-消费者模式。

import threading
import time
import random
class ProductQueue:
    def __init__(self, max_size=5):
        self.queue = []
        self.max_size = max_size
        self.condition = threading.Condition()
    def produce(self, item):
        with self.condition:
            while len(self.queue) >= self.max_size:
                print(f"队列已满,生产者等待...")
                self.condition.wait()
            self.queue.append(item)
            print(f"生产 {item}, 队列: {self.queue}")
            self.condition.notify_all()  # 通知所有等待的消费者
    def consume(self):
        with self.condition:
            while len(self.queue) == 0:
                print(f"队列为空,消费者等待...")
                self.condition.wait()
            item = self.queue.pop(0)
            print(f"消费 {item}, 队列: {self.queue}")
            self.condition.notify_all()  # 通知所有等待的生产者
            return item
# 生产者和消费者函数
def producer(queue):
    for i in range(10):
        queue.produce(f"产品-{i}")
        time.sleep(random.random())
def consumer(queue):
    for _ in range(10):
        queue.consume()
        time.sleep(random.random())
# 主程序
product_queue = ProductQueue(max_size=3)
producer_thread = threading.Thread(target=producer, args=(product_queue,))
consumer_thread = threading.Thread(target=consumer, args=(product_queue,))
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()

使用Barrier(屏障)

让多个线程在某个点同步,所有线程都到达后才继续执行。

import threading
import time
# 创建一个需要3个线程都到达的屏障
barrier = threading.Barrier(3)
def race(racer_id):
    time.sleep(2)  # 模拟准备时间
    print(f"选手 {racer_id} 已到达起跑线")
    barrier.wait()  # 等待所有选手
    print(f"选手 {racer_id} 开始跑步!")
# 创建3位选手
racers = []
for i in range(3):
    t = threading.Thread(target=race, args=(i,))
    racers.append(t)
    t.start()
for t in racers:
    t.join()

实用示例:线程安全的计数器

import threading
import time
class ThreadSafeCounter:
    def __init__(self):
        self.count = 0
        self.lock = threading.Lock()
        self.event = threading.Event()
    def increment(self):
        with self.lock:
            self.count += 1
            if self.count == 10:  # 达到10时触发事件
                self.event.set()
        return self.count
    def wait_for_target(self, target=10):
        print(f"等待计数达到 {target}...")
        self.event.wait()
        print(f"计数已达到 {target}!")
        return self.count
def worker(counter, worker_id):
    for _ in range(3):
        count = counter.increment()
        print(f"工作线程 {worker_id}: 当前计数 = {count}")
        time.sleep(0.5)
# 使用示例
counter = ThreadSafeCounter()
# 创建等待线程
waiter = threading.Thread(target=counter.wait_for_target, args=(10,))
waiter.start()
# 创建工作线程
workers = []
for i in range(5):
    t = threading.Thread(target=worker, args=(counter, i))
    workers.append(t)
    t.start()
for t in workers:
    t.join()
waiter.join()
print(f"最终计数: {counter.count}")

选择合适的同步机制

同步机制 适用场景 特点
Lock 基本互斥访问 简单,不可重入
RLock 递归锁定、嵌套锁定 可重入,避免死锁
Semaphore 控制并发数 限流,资源池
Event 简单线程通信 一次性信号
Condition 复杂状态变化 多条件等待/通知
Barrier 多线程同步点 等待所有线程到达

建议

  • 优先使用高级抽象(如with语句)管理锁
  • 避免使用全局锁,尽量细粒度锁定
  • 防止死锁:保持一致的锁获取顺序,或使用可重入锁
  • 考虑使用concurrent.futures模块,它提供了更高级的并发控制

抱歉,评论功能暂时关闭!