Python案例怎么创建线程任务?

wen python案例 10

本文目录导读:

Python案例怎么创建线程任务?

  1. 使用 threading.Thread 直接创建
  2. 使用线程池 (ThreadPoolExecutor)
  3. 实际案例:多线程下载文件
  4. 线程安全的共享数据
  5. 线程间通信(使用 queue)
  6. 重要注意事项

在Python中创建线程任务有多种方式,我来详细介绍几种常用方法:

使用 threading.Thread 直接创建

传入函数

import threading
import time
def worker(name, delay):
    """线程要执行的任务"""
    for i in range(5):
        print(f"线程 {name} 执行第 {i+1} 次")
        time.sleep(delay)
# 创建线程
t1 = threading.Thread(target=worker, args=("线程1", 0.5))
t2 = threading.Thread(target=worker, args=("线程2", 1.0))
# 启动线程
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()
print("所有线程执行完毕")

继承 Thread 类

import threading
import time
class MyThread(threading.Thread):
    def __init__(self, name, delay):
        super().__init__()
        self.name = name
        self.delay = delay
    def run(self):
        """重写 run 方法,定义线程执行的任务"""
        for i in range(5):
            print(f"线程 {self.name} 执行第 {i+1} 次")
            time.sleep(self.delay)
# 创建并启动线程
t1 = MyThread("线程1", 0.5)
t2 = MyThread("线程2", 1.0)
t1.start()
t2.start()
t1.join()
t2.join()

使用线程池 (ThreadPoolExecutor)

from concurrent.futures import ThreadPoolExecutor
import time
def worker(task_id, value):
    """线程任务函数"""
    time.sleep(1)
    print(f"任务 {task_id} 完成,处理值: {value}")
    return value * 2
# 方式一:自动管理线程
with ThreadPoolExecutor(max_workers=3) as executor:
    # 提交多个任务
    futures = [executor.submit(worker, i, i * 10) for i in range(5)]
    # 获取结果
    for future in futures:
        result = future.result()
        print(f"获取结果: {result}")
# 方式二:使用 map 方法
with ThreadPoolExecutor(max_workers=3) as executor:
    values = [1, 2, 3, 4, 5]
    results = executor.map(worker, range(5), values)
    for result in results:
        print(f"map 方式结果: {result}")

实际案例:多线程下载文件

import threading
import time
import random
class DownloadTask:
    """模拟文件下载任务"""
    def __init__(self, file_name, size_mb):
        self.file_name = file_name
        self.size_mb = size_mb
    def download(self, callback=None):
        """模拟下载过程"""
        downloaded = 0
        while downloaded < self.size_mb:
            time.sleep(0.5)  # 模拟下载延迟
            chunk = random.randint(1, 5)
            downloaded = min(downloaded + chunk, self.size_mb)
            progress = (downloaded / self.size_mb) * 100
            if callback:
                callback(self.file_name, progress)
        return f"{self.file_name} 下载完成"
def progress_callback(file_name, progress):
    print(f"正在下载 {file_name}: {progress:.1f}%")
def download_file(task):
    """线程执行函数"""
    result = task.download(callback=progress_callback)
    print(result)
# 创建下载任务
tasks = [
    DownloadTask("video1.mp4", 10),
    DownloadTask("video2.mp4", 15),
    DownloadTask("document.pdf", 5),
]
# 创建并启动线程
threads = []
for task in tasks:
    t = threading.Thread(target=download_file, args=(task,))
    threads.append(t)
    t.start()
# 等待所有线程完成
for t in threads:
    t.join()
print("所有下载任务完成")

线程安全的共享数据

import threading
import time
class SharedCounter:
    """线程安全的计数器"""
    def __init__(self):
        self.value = 0
        self.lock = threading.Lock()
    def increment(self):
        with self.lock:  # 使用锁确保线程安全
            current = self.value
            time.sleep(0.1)  # 模拟耗时操作
            self.value = current + 1
    def get_value(self):
        with self.lock:
            return self.value
def worker(counter, name):
    """线程任务"""
    for _ in range(10):
        counter.increment()
        print(f"{name} 将计数器增加到 {counter.get_value()}")
# 使用线程安全的计数器
counter = SharedCounter()
threads = [
    threading.Thread(target=worker, args=(counter, f"线程{i}"))
    for i in range(3)
]
for t in threads:
    t.start()
for t in threads:
    t.join()
print(f"最终计数: {counter.get_value()}")

线程间通信(使用 queue)

import threading
import queue
import time
import random
def producer(q, name):
    """生产者线程"""
    for i in range(5):
        item = f"产品-{name}-{i}"
        print(f"生产者 {name} 生产: {item}")
        q.put(item)
        time.sleep(random.uniform(0.5, 2))
def consumer(q, name):
    """消费者线程"""
    while True:
        try:
            item = q.get(timeout=3)  # 3秒超时
            print(f"消费者 {name} 消费: {item}")
            time.sleep(random.uniform(1, 3))
            q.task_done()
        except queue.Empty:
            print(f"消费者 {name} 等待超时,结束")
            break
# 创建队列
task_queue = queue.Queue(maxsize=10)
# 创建生产者和消费者线程
producers = [
    threading.Thread(target=producer, args=(task_queue, f"P{i}"))
    for i in range(2)
]
consumers = [
    threading.Thread(target=consumer, args=(task_queue, f"C{i}"))
    for i in range(3)
]
# 启动所有线程
for p in producers:
    p.start()
for c in consumers:
    c.start()
# 等待所有生产者完成
for p in producers:
    p.join()
# 等待队列清空
task_queue.join()
print("所有任务完成")

重要注意事项

  1. GIL 限制:Python的GIL会限制CPU密集型任务的并行执行,适合I/O密集型任务
  2. 线程安全:共享数据时需要使用锁(lock)或使用线程安全的数据结构
  3. 死锁避免:使用锁时注意避免死锁,可以设置超时或使用锁的顺序
  4. 资源管理:使用 with 语句或 try/finally 确保资源正确释放

选择哪种方式取决于你的具体需求:

  • 少量简单任务:threading.Thread 直接创建
  • 大量任务或需要结果:ThreadPoolExecutor
  • 复杂任务逻辑:继承 Thread
  • 生产者-消费者模式:配合 queue.Queue 使用

抱歉,评论功能暂时关闭!