Python案例怎么保证任务一致性?

wen python案例 74

本文目录导读:

Python案例怎么保证任务一致性?

  1. 数据库事务(最基础)
  2. 使用上下文管理器(推荐)
  3. 消息队列 + 幂等性(分布式场景)
  4. 使用Python的multiprocessing
  5. 使用threading.RLock(线程安全)
  6. 完整案例:银行转账系统

在Python中保证任务一致性(通常指数据一致性、事务一致性或任务执行的原子性)是一个重要话题,根据你的具体场景,有几种常见的方法:

数据库事务(最基础)

对于数据库操作,使用事务保证ACID特性:

import sqlite3
def transfer_money(from_account, to_account, amount):
    conn = sqlite3.connect('bank.db')
    try:
        # 开始事务
        conn.execute("BEGIN TRANSACTION")
        # 扣钱
        conn.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", 
                    (amount, from_account))
        # 加钱
        conn.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", 
                    (amount, to_account))
        # 提交事务
        conn.commit()
        print("转账成功")
    except Exception as e:
        # 回滚事务
        conn.rollback()
        print(f"转账失败: {e}")
    finally:
        conn.close()

使用上下文管理器(推荐)

Python的contextlib可以封装事务逻辑:

from contextlib import contextmanager
@contextmanager
def transaction(conn):
    try:
        conn.execute("BEGIN")
        yield  # 在这里执行业务逻辑
        conn.commit()
    except Exception:
        conn.rollback()
        raise
def transfer_money_safe(from_account, to_account, amount):
    conn = get_db_connection()
    with transaction(conn):
        conn.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", 
                    (amount, from_account))
        conn.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", 
                    (amount, to_account))

消息队列 + 幂等性(分布式场景)

使用Redis或RabbitMQ等消息队列:

import redis
import json
class IdempotentTaskProcessor:
    def __init__(self):
        self.redis_client = redis.Redis()
        self.processed_tasks = set()
    def process_task_with_idempotency(self, task_id, task_func, *args, **kwargs):
        # 检查是否已经处理过
        if self.redis_client.sismember('processed_tasks', task_id):
            print(f"任务 {task_id} 已经处理过,跳过")
            return
        try:
            # 执行任务
            result = task_func(*args, **kwargs)
            # 标记为已处理
            self.redis_client.sadd('processed_tasks', task_id)
            # 设置过期时间,避免无限增长
            self.redis_client.expire('processed_tasks', 86400)
            return result
        except Exception as e:
            # 处理失败,不标记,允许重试
            print(f"任务执行失败: {e}")
            raise

使用Python的multiprocessing

from multiprocessing import Lock, Process
import time
class ConsistentTaskManager:
    def __init__(self):
        self.lock = Lock()
        self.state = {}
    def update_state(self, key, value):
        with self.lock:  # 保证原子性
            self.state[key] = value
            time.sleep(0.1)  # 模拟耗时操作
            return True
    def process_task(self, task_id, data):
        if self.lock.acquire(block=True):
            try:
                # 执行任务
                print(f"处理任务 {task_id}: {data}")
                self.state[f"task_{task_id}"] = "completed"
            finally:
                self.lock.release()

使用threading.RLock(线程安全)

import threading
class ThreadSafeCounter:
    def __init__(self):
        self.count = 0
        self.lock = threading.RLock()  # 可重入锁
    def increment(self):
        with self.lock:
            self.count += 1
    def decrement(self):
        with self.lock:
            self.count -= 1
    def get_count(self):
        with self.lock:
            return self.count
# 确保任务序列化
class SequentialTaskExecutor:
    def __init__(self):
        self.lock = threading.Lock()
        self.task_queue = []
    def execute_task(self, task_func):
        with self.lock:
            return task_func()

完整案例:银行转账系统

import threading
import sqlite3
from contextlib import contextmanager
from typing import Optional
class ConsistentBankSystem:
    def __init__(self, db_path: str = "bank.db"):
        self.db_path = db_path
        self.lock = threading.Lock()
        self._init_database()
    def _init_database(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute("""
            CREATE TABLE IF NOT EXISTS accounts (
                id INTEGER PRIMARY KEY,
                name TEXT NOT NULL,
                balance REAL NOT NULL DEFAULT 0.0,
                version INTEGER NOT NULL DEFAULT 1  -- 乐观锁版本号
            )
        """)
        conn.commit()
        conn.close()
    @contextmanager
    def _get_connection(self):
        conn = sqlite3.connect(self.db_path)
        conn.execute("PRAGMA journal_mode=WAL")  # Write-Ahead Logging
        try:
            yield conn
        finally:
            conn.close()
    def transfer(self, from_id: int, to_id: int, amount: float) -> bool:
        with self.lock:  # 防止并发转账
            with self._get_connection() as conn:
                try:
                    # 开启事务
                    conn.execute("BEGIN TRANSACTION")
                    # 乐观锁:检查版本号
                    cursor = conn.execute(
                        "SELECT balance, version FROM accounts WHERE id = ?",
                        (from_id,)
                    )
                    from_balance, from_version = cursor.fetchone()
                    if from_balance < amount:
                        raise ValueError("余额不足")
                    # 使用乐观锁更新
                    rows_affected = conn.execute(
                        """UPDATE accounts 
                           SET balance = balance - ?, version = version + 1 
                           WHERE id = ? AND version = ?""",
                        (amount, from_id, from_version)
                    ).rowcount
                    if rows_affected != 1:
                        raise ValueError("版本冲突,请重试")
                    # 更新目标账户
                    conn.execute(
                        "UPDATE accounts SET balance = balance + ? WHERE id = ?",
                        (amount, to_id)
                    )
                    # 记录交易日志(用于幂等性)
                    conn.execute(
                        "INSERT INTO transaction_log (from_id, to_id, amount, timestamp) VALUES (?, ?, ?, ?)",
                        (from_id, to_id, amount, int(time.time()))
                    )
                    conn.commit()
                    return True
                except Exception as e:
                    conn.rollback()
                    print(f"转账失败: {e}")
                    return False
    def get_balance(self, account_id: int) -> Optional[float]:
        with self._get_connection() as conn:
            cursor = conn.execute(
                "SELECT balance FROM accounts WHERE id = ?",
                (account_id,)
            )
            row = cursor.fetchone()
            return row[0] if row else None
# 使用示例
if __name__ == "__main__":
    bank = ConsistentBankSystem()
    # 创建测试账户
    conn = sqlite3.connect("bank.db")
    conn.execute("INSERT OR IGNORE INTO accounts (id, name, balance) VALUES (1, 'Alice', 1000.0)")
    conn.execute("INSERT OR IGNORE INTO accounts (id, name, balance) VALUES (2, 'Bob', 500.0)")
    conn.commit()
    conn.close()
    # 并发转账测试
    def test_concurrent_transfer():
        success = bank.transfer(1, 2, 200)
        print(f"转账结果: {'成功' if success else '失败'}")
    threads = []
    for _ in range(10):
        t = threading.Thread(target=test_concurrent_transfer)
        threads.append(t)
        t.start()
    for t in threads:
        t.join()
    print(f"Alice余额: {bank.get_balance(1)}")
    print(f"Bob余额: {bank.get_balance(2)}")
原则 实现方式 适用场景
原子性 事务、锁、消息队列 数据库操作、文件操作
一致性 事务、乐观锁、校验 数据更新、转账等
隔离性 锁、版本号、序列化 并发访问
持久性 WAL、日志、备份 数据持久化
幂等性 唯一ID、状态标记 消息重试、分布式系统
顺序性 锁、队列、CAS 任务执行顺序

选择建议

  • 简单场景:使用threading.Lock或数据库事务
  • 复杂分布式:使用消息队列+幂等性设计
  • 高并发:使用乐观锁+版本号
  • 需要回滚:使用事务+异常处理

关键是要根据你的业务场景和一致性要求(强一致性 vs 最终一致性)选择合适的方案。

抱歉,评论功能暂时关闭!