本文目录导读:

在Python中保证任务一致性(通常指数据一致性、事务一致性或任务执行的原子性)是一个重要话题,根据你的具体场景,有几种常见的方法:
数据库事务(最基础)
对于数据库操作,使用事务保证ACID特性:
import sqlite3
def transfer_money(from_account, to_account, amount):
conn = sqlite3.connect('bank.db')
try:
# 开始事务
conn.execute("BEGIN TRANSACTION")
# 扣钱
conn.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?",
(amount, from_account))
# 加钱
conn.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?",
(amount, to_account))
# 提交事务
conn.commit()
print("转账成功")
except Exception as e:
# 回滚事务
conn.rollback()
print(f"转账失败: {e}")
finally:
conn.close()
使用上下文管理器(推荐)
Python的contextlib可以封装事务逻辑:
from contextlib import contextmanager
@contextmanager
def transaction(conn):
try:
conn.execute("BEGIN")
yield # 在这里执行业务逻辑
conn.commit()
except Exception:
conn.rollback()
raise
def transfer_money_safe(from_account, to_account, amount):
conn = get_db_connection()
with transaction(conn):
conn.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?",
(amount, from_account))
conn.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?",
(amount, to_account))
消息队列 + 幂等性(分布式场景)
使用Redis或RabbitMQ等消息队列:
import redis
import json
class IdempotentTaskProcessor:
def __init__(self):
self.redis_client = redis.Redis()
self.processed_tasks = set()
def process_task_with_idempotency(self, task_id, task_func, *args, **kwargs):
# 检查是否已经处理过
if self.redis_client.sismember('processed_tasks', task_id):
print(f"任务 {task_id} 已经处理过,跳过")
return
try:
# 执行任务
result = task_func(*args, **kwargs)
# 标记为已处理
self.redis_client.sadd('processed_tasks', task_id)
# 设置过期时间,避免无限增长
self.redis_client.expire('processed_tasks', 86400)
return result
except Exception as e:
# 处理失败,不标记,允许重试
print(f"任务执行失败: {e}")
raise
使用Python的multiprocessing锁
from multiprocessing import Lock, Process
import time
class ConsistentTaskManager:
def __init__(self):
self.lock = Lock()
self.state = {}
def update_state(self, key, value):
with self.lock: # 保证原子性
self.state[key] = value
time.sleep(0.1) # 模拟耗时操作
return True
def process_task(self, task_id, data):
if self.lock.acquire(block=True):
try:
# 执行任务
print(f"处理任务 {task_id}: {data}")
self.state[f"task_{task_id}"] = "completed"
finally:
self.lock.release()
使用threading.RLock(线程安全)
import threading
class ThreadSafeCounter:
def __init__(self):
self.count = 0
self.lock = threading.RLock() # 可重入锁
def increment(self):
with self.lock:
self.count += 1
def decrement(self):
with self.lock:
self.count -= 1
def get_count(self):
with self.lock:
return self.count
# 确保任务序列化
class SequentialTaskExecutor:
def __init__(self):
self.lock = threading.Lock()
self.task_queue = []
def execute_task(self, task_func):
with self.lock:
return task_func()
完整案例:银行转账系统
import threading
import sqlite3
from contextlib import contextmanager
from typing import Optional
class ConsistentBankSystem:
def __init__(self, db_path: str = "bank.db"):
self.db_path = db_path
self.lock = threading.Lock()
self._init_database()
def _init_database(self):
conn = sqlite3.connect(self.db_path)
conn.execute("""
CREATE TABLE IF NOT EXISTS accounts (
id INTEGER PRIMARY KEY,
name TEXT NOT NULL,
balance REAL NOT NULL DEFAULT 0.0,
version INTEGER NOT NULL DEFAULT 1 -- 乐观锁版本号
)
""")
conn.commit()
conn.close()
@contextmanager
def _get_connection(self):
conn = sqlite3.connect(self.db_path)
conn.execute("PRAGMA journal_mode=WAL") # Write-Ahead Logging
try:
yield conn
finally:
conn.close()
def transfer(self, from_id: int, to_id: int, amount: float) -> bool:
with self.lock: # 防止并发转账
with self._get_connection() as conn:
try:
# 开启事务
conn.execute("BEGIN TRANSACTION")
# 乐观锁:检查版本号
cursor = conn.execute(
"SELECT balance, version FROM accounts WHERE id = ?",
(from_id,)
)
from_balance, from_version = cursor.fetchone()
if from_balance < amount:
raise ValueError("余额不足")
# 使用乐观锁更新
rows_affected = conn.execute(
"""UPDATE accounts
SET balance = balance - ?, version = version + 1
WHERE id = ? AND version = ?""",
(amount, from_id, from_version)
).rowcount
if rows_affected != 1:
raise ValueError("版本冲突,请重试")
# 更新目标账户
conn.execute(
"UPDATE accounts SET balance = balance + ? WHERE id = ?",
(amount, to_id)
)
# 记录交易日志(用于幂等性)
conn.execute(
"INSERT INTO transaction_log (from_id, to_id, amount, timestamp) VALUES (?, ?, ?, ?)",
(from_id, to_id, amount, int(time.time()))
)
conn.commit()
return True
except Exception as e:
conn.rollback()
print(f"转账失败: {e}")
return False
def get_balance(self, account_id: int) -> Optional[float]:
with self._get_connection() as conn:
cursor = conn.execute(
"SELECT balance FROM accounts WHERE id = ?",
(account_id,)
)
row = cursor.fetchone()
return row[0] if row else None
# 使用示例
if __name__ == "__main__":
bank = ConsistentBankSystem()
# 创建测试账户
conn = sqlite3.connect("bank.db")
conn.execute("INSERT OR IGNORE INTO accounts (id, name, balance) VALUES (1, 'Alice', 1000.0)")
conn.execute("INSERT OR IGNORE INTO accounts (id, name, balance) VALUES (2, 'Bob', 500.0)")
conn.commit()
conn.close()
# 并发转账测试
def test_concurrent_transfer():
success = bank.transfer(1, 2, 200)
print(f"转账结果: {'成功' if success else '失败'}")
threads = []
for _ in range(10):
t = threading.Thread(target=test_concurrent_transfer)
threads.append(t)
t.start()
for t in threads:
t.join()
print(f"Alice余额: {bank.get_balance(1)}")
print(f"Bob余额: {bank.get_balance(2)}")
| 原则 | 实现方式 | 适用场景 |
|---|---|---|
| 原子性 | 事务、锁、消息队列 | 数据库操作、文件操作 |
| 一致性 | 事务、乐观锁、校验 | 数据更新、转账等 |
| 隔离性 | 锁、版本号、序列化 | 并发访问 |
| 持久性 | WAL、日志、备份 | 数据持久化 |
| 幂等性 | 唯一ID、状态标记 | 消息重试、分布式系统 |
| 顺序性 | 锁、队列、CAS | 任务执行顺序 |
选择建议:
- 简单场景:使用
threading.Lock或数据库事务 - 复杂分布式:使用消息队列+幂等性设计
- 高并发:使用乐观锁+版本号
- 需要回滚:使用事务+异常处理
关键是要根据你的业务场景和一致性要求(强一致性 vs 最终一致性)选择合适的方案。