Python案例中的信号量怎么用?

wen python案例 3

本文目录导读:

Python案例中的信号量怎么用?

  1. 基础用法:限制并发线程数
  2. 使用上下文管理器(推荐方式)
  3. 生产者-消费者模式
  4. 数据库连接池示例
  5. BoundedSemaphore(有界信号量)

在Python中,信号量(Semaphore)是用于控制多线程/多进程并发访问共享资源数量的同步原语,下面通过几个案例来说明其用法。

基础用法:限制并发线程数

import threading
import time
import random
# 创建一个信号量,最多允许3个线程同时访问
semaphore = threading.Semaphore(3)
def worker(worker_id):
    print(f"工作线程 {worker_id} 正在等待信号量...")
    semaphore.acquire()  # 获取信号量
    try:
        print(f"工作线程 {worker_id} 获得信号量,开始工作")
        time.sleep(random.uniform(1, 3))  # 模拟工作
        print(f"工作线程 {worker_id} 完成工作")
    finally:
        # 释放信号量,让其他线程可以使用
        semaphore.release()
# 创建10个工作线程
threads = []
for i in range(10):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()
# 等待所有线程完成
for t in threads:
    t.join()
print("所有工作完成")

使用上下文管理器(推荐方式)

import threading
import time
semaphore = threading.Semaphore(2)
def web_scraper(task_id):
    # 使用with语句自动管理acquire和release
    with semaphore:
        print(f"任务 {task_id} 开始爬取...")
        time.sleep(1)
        print(f"任务 {task_id} 爬取完成")
# 启动多个爬虫任务
tasks = []
for i in range(6):
    t = threading.Thread(target=web_scraper, args=(i,))
    tasks.append(t)
    t.start()
for t in tasks:
    t.join()

生产者-消费者模式

import threading
import time
import random
# 限制缓冲区最大容量
buffer = []
buffer_semaphore = threading.Semaphore(5)  # 最多存放5个产品
mutex = threading.Lock()  # 互斥锁保护缓冲区
def producer():
    for i in range(10):
        product = f"产品-{i}"
        # 尝试放入缓冲区
        buffer_semaphore.acquire()  # 信号量减1
        with mutex:
            buffer.append(product)
            print(f"生产者: 生产 {product}, 缓冲区大小: {len(buffer)}")
        time.sleep(random.random())
def consumer():
    for i in range(10):
        # 从缓冲区取出产品
        buffer_semaphore.release()  # 信号量加1
        with mutex:
            if buffer:
                product = buffer.pop(0)
                print(f"消费者: 消费 {product}, 缓冲区大小: {len(buffer)}")
        time.sleep(random.random())
# 创建生产者和消费者线程
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
producer_thread.join()
consumer_thread.join()

数据库连接池示例

import threading
import time
from queue import Queue
class DatabaseConnectionPool:
    def __init__(self, max_connections=5):
        self.max_connections = max_connections
        self.semaphore = threading.Semaphore(max_connections)
        self.connections = Queue(maxsize=max_connections)
        # 初始化连接池
        for i in range(max_connections):
            self.connections.put(f"连接-{i}")
    def get_connection(self):
        """获取数据库连接"""
        self.semaphore.acquire()  # 信号量减1
        return self.connections.get()
    def return_connection(self, conn):
        """归还数据库连接"""
        self.connections.put(conn)
        self.semaphore.release()  # 信号量加1
# 使用连接池
pool = DatabaseConnectionPool(max_connections=3)
def query_database(query_id):
    conn = pool.get_connection()
    try:
        print(f"查询 {query_id}: 使用 {conn}")
        time.sleep(1)
        print(f"查询 {query_id}: 完成")
    finally:
        pool.return_connection(conn)
# 模拟多个数据库查询
threads = []
for i in range(8):
    t = threading.Thread(target=query_database, args=(i,))
    threads.append(t)
    t.start()
for t in threads:
    t.join()

BoundedSemaphore(有界信号量)

import threading
import time
# BoundedSemaphore不会超过初始值
bounded_sem = threading.BoundedSemaphore(2)
def task(name):
    bounded_sem.acquire()
    print(f"{name} 获得资源")
    time.sleep(0.5)
    bounded_sem.release()
    print(f"{name} 释放资源")
# 测试
t1 = threading.Thread(target=task, args=("Task-1",))
t2 = threading.Thread(target=task, args=("Task-2",))
t3 = threading.Thread(target=task, args=("Task-3",))
t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()
print("所有任务完成")
# 如果释放太多次会引发ValueError
try:
    bounded_sem.release()  # 这会引发错误
    bounded_sem.release()  # 超过初始值2
    bounded_sem.release()  # 这里会抛出异常
except ValueError as e:
    print(f"错误: {e}")
  1. 信号量创建threading.Semaphore(n) 创建初始值为n的信号量
  2. 获取信号量acquire()with semaphore:
  3. 释放信号量release()with语句自动释放
  4. BoundedSemaphore:确保不会超过初始值,防止编程错误
  5. 适用场景:限制并发访问数、连接池、资源池等

信号量适合用于限流场景,确保同时访问资源的线程/进程数不超过预设值。

抱歉,评论功能暂时关闭!