本文目录导读:

在Python中创建线程任务有多种方式,我来详细介绍几种常用方法:
使用 threading.Thread 直接创建
传入函数
import threading
import time
def worker(name, delay):
"""线程要执行的任务"""
for i in range(5):
print(f"线程 {name} 执行第 {i+1} 次")
time.sleep(delay)
# 创建线程
t1 = threading.Thread(target=worker, args=("线程1", 0.5))
t2 = threading.Thread(target=worker, args=("线程2", 1.0))
# 启动线程
t1.start()
t2.start()
# 等待线程结束
t1.join()
t2.join()
print("所有线程执行完毕")
继承 Thread 类
import threading
import time
class MyThread(threading.Thread):
def __init__(self, name, delay):
super().__init__()
self.name = name
self.delay = delay
def run(self):
"""重写 run 方法,定义线程执行的任务"""
for i in range(5):
print(f"线程 {self.name} 执行第 {i+1} 次")
time.sleep(self.delay)
# 创建并启动线程
t1 = MyThread("线程1", 0.5)
t2 = MyThread("线程2", 1.0)
t1.start()
t2.start()
t1.join()
t2.join()
使用线程池 (ThreadPoolExecutor)
from concurrent.futures import ThreadPoolExecutor
import time
def worker(task_id, value):
"""线程任务函数"""
time.sleep(1)
print(f"任务 {task_id} 完成,处理值: {value}")
return value * 2
# 方式一:自动管理线程
with ThreadPoolExecutor(max_workers=3) as executor:
# 提交多个任务
futures = [executor.submit(worker, i, i * 10) for i in range(5)]
# 获取结果
for future in futures:
result = future.result()
print(f"获取结果: {result}")
# 方式二:使用 map 方法
with ThreadPoolExecutor(max_workers=3) as executor:
values = [1, 2, 3, 4, 5]
results = executor.map(worker, range(5), values)
for result in results:
print(f"map 方式结果: {result}")
实际案例:多线程下载文件
import threading
import time
import random
class DownloadTask:
"""模拟文件下载任务"""
def __init__(self, file_name, size_mb):
self.file_name = file_name
self.size_mb = size_mb
def download(self, callback=None):
"""模拟下载过程"""
downloaded = 0
while downloaded < self.size_mb:
time.sleep(0.5) # 模拟下载延迟
chunk = random.randint(1, 5)
downloaded = min(downloaded + chunk, self.size_mb)
progress = (downloaded / self.size_mb) * 100
if callback:
callback(self.file_name, progress)
return f"{self.file_name} 下载完成"
def progress_callback(file_name, progress):
print(f"正在下载 {file_name}: {progress:.1f}%")
def download_file(task):
"""线程执行函数"""
result = task.download(callback=progress_callback)
print(result)
# 创建下载任务
tasks = [
DownloadTask("video1.mp4", 10),
DownloadTask("video2.mp4", 15),
DownloadTask("document.pdf", 5),
]
# 创建并启动线程
threads = []
for task in tasks:
t = threading.Thread(target=download_file, args=(task,))
threads.append(t)
t.start()
# 等待所有线程完成
for t in threads:
t.join()
print("所有下载任务完成")
线程安全的共享数据
import threading
import time
class SharedCounter:
"""线程安全的计数器"""
def __init__(self):
self.value = 0
self.lock = threading.Lock()
def increment(self):
with self.lock: # 使用锁确保线程安全
current = self.value
time.sleep(0.1) # 模拟耗时操作
self.value = current + 1
def get_value(self):
with self.lock:
return self.value
def worker(counter, name):
"""线程任务"""
for _ in range(10):
counter.increment()
print(f"{name} 将计数器增加到 {counter.get_value()}")
# 使用线程安全的计数器
counter = SharedCounter()
threads = [
threading.Thread(target=worker, args=(counter, f"线程{i}"))
for i in range(3)
]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"最终计数: {counter.get_value()}")
线程间通信(使用 queue)
import threading
import queue
import time
import random
def producer(q, name):
"""生产者线程"""
for i in range(5):
item = f"产品-{name}-{i}"
print(f"生产者 {name} 生产: {item}")
q.put(item)
time.sleep(random.uniform(0.5, 2))
def consumer(q, name):
"""消费者线程"""
while True:
try:
item = q.get(timeout=3) # 3秒超时
print(f"消费者 {name} 消费: {item}")
time.sleep(random.uniform(1, 3))
q.task_done()
except queue.Empty:
print(f"消费者 {name} 等待超时,结束")
break
# 创建队列
task_queue = queue.Queue(maxsize=10)
# 创建生产者和消费者线程
producers = [
threading.Thread(target=producer, args=(task_queue, f"P{i}"))
for i in range(2)
]
consumers = [
threading.Thread(target=consumer, args=(task_queue, f"C{i}"))
for i in range(3)
]
# 启动所有线程
for p in producers:
p.start()
for c in consumers:
c.start()
# 等待所有生产者完成
for p in producers:
p.join()
# 等待队列清空
task_queue.join()
print("所有任务完成")
重要注意事项
- GIL 限制:Python的GIL会限制CPU密集型任务的并行执行,适合I/O密集型任务
- 线程安全:共享数据时需要使用锁(lock)或使用线程安全的数据结构
- 死锁避免:使用锁时注意避免死锁,可以设置超时或使用锁的顺序
- 资源管理:使用
with语句或 try/finally 确保资源正确释放
选择哪种方式取决于你的具体需求:
- 少量简单任务:
threading.Thread直接创建 - 大量任务或需要结果:
ThreadPoolExecutor - 复杂任务逻辑:继承
Thread类 - 生产者-消费者模式:配合
queue.Queue使用