Python案例如何处理数据库事务?

wen python案例 9

本文目录导读:

Python案例如何处理数据库事务?

  1. 案例1:使用 sqlite3 处理事务(自动/手动)
  2. 案例2:使用 mysql-connector-python 处理事务
  3. 案例3:使用 psycopg2 处理 PostgreSQL 事务
  4. 案例4:使用 SQLAlchemy ORM 处理事务
  5. 事务处理的核心要点
  6. 常见错误与避免

在Python中处理数据库事务,通常使用数据库驱动库(如 sqlite3mysql-connector-pythonpsycopg2 等)或 ORM 框架(如 SQLAlchemy、Django ORM),事务的核心是原子性:一组操作要么全部成功,要么全部失败回滚。

下面通过几个常见的案例来说明如何正确处理事务。


案例1:使用 sqlite3 处理事务(自动/手动)

sqlite3 默认是自动提交模式,每条 SQL 语句独立提交,要使用事务,需要显式控制 BEGINCOMMIT/ROLLBACK

import sqlite3
def transfer_funds(from_acc, to_acc, amount):
    conn = sqlite3.connect('bank.db')
    cursor = conn.cursor()
    try:
        # 手动开始事务
        cursor.execute("BEGIN")
        # 扣钱
        cursor.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", (amount, from_acc))
        # 加钱
        cursor.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", (amount, to_acc))
        # 提交事务
        conn.commit()
        print("转账成功")
    except Exception as e:
        # 回滚事务
        conn.rollback()
        print(f"转账失败,回滚事务: {e}")
    finally:
        conn.close()
# 使用 with 语句(推荐)
def safe_transfer(from_acc, to_acc, amount):
    conn = sqlite3.connect('bank.db')
    try:
        # with conn 会自动在退出时 commit(无异常)或 rollback(有异常)
        with conn:
            conn.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", (amount, from_acc))
            conn.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", (amount, to_acc))
        print("转账成功")
    except Exception as e:
        print(f"转账失败: {e}")
    finally:
        conn.close()

关键点

  • 使用 conn.commit() 提交,conn.rollback() 回滚。
  • with conn: 上下文管理器会自动处理事务(sqlite3 特有)。

案例2:使用 mysql-connector-python 处理事务

MySQL 默认是自动提交,需要关闭自动提交或手动控制事务。

import mysql.connector
from mysql.connector import Error
def batch_insert_orders(orders_data):
    conn = None
    try:
        conn = mysql.connector.connect(
            host='localhost',
            database='shop',
            user='root',
            password='password'
        )
        # 关闭自动提交
        conn.autocommit = False
        cursor = conn.cursor()
        # 执行多条插入
        for order in orders_data:
            cursor.execute(
                "INSERT INTO orders (user_id, product_id, quantity) VALUES (%s, %s, %s)",
                (order['user_id'], order['product_id'], order['quantity'])
            )
        # 全部成功则提交
        conn.commit()
        print(f"成功插入 {len(orders_data)} 条订单")
    except Error as e:
        if conn:
            conn.rollback()
            print(f"事务回滚: {e}")
    finally:
        if conn and conn.is_connected():
            cursor.close()
            conn.close()

案例3:使用 psycopg2 处理 PostgreSQL 事务

PostgreSQL 驱动默认不是自动提交,但连接建立后处于一个隐式事务中,通常显式控制更安全。

import psycopg2
from psycopg2 import sql
def update_inventory_and_create_order(user_id, product_id, quantity):
    conn = None
    try:
        conn = psycopg2.connect(
            host='localhost',
            database='store',
            user='app_user',
            password='secret'
        )
        cur = conn.cursor()
        # 检查库存
        cur.execute("SELECT stock FROM products WHERE id = %s FOR UPDATE", (product_id,))
        stock = cur.fetchone()[0]
        if stock < quantity:
            raise ValueError("库存不足")
        # 扣减库存
        cur.execute("UPDATE products SET stock = stock - %s WHERE id = %s", (quantity, product_id))
        # 创建订单
        cur.execute("INSERT INTO orders (user_id, product_id, quantity) VALUES (%s, %s, %s)",
                    (user_id, product_id, quantity))
        # 提交事务
        conn.commit()
        print("操作成功")
    except Exception as e:
        if conn:
            conn.rollback()
            print(f"操作失败,回滚: {e}")
    finally:
        if conn:
            cur.close()
            conn.close()

注意FOR UPDATE 用于行级锁,防止并发时的超卖问题。


案例4:使用 SQLAlchemy ORM 处理事务

SQLAlchemy 提供了 session 对象管理事务,更上层更易用。

from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
# 假设已定义 UserAccount 模型
from models import UserAccount, engine
Session = sessionmaker(bind=engine)
def transfer_orm(from_user_id, to_user_id, amount):
    session = Session()
    try:
        from_user = session.query(UserAccount).filter_by(id=from_user_id).with_for_update().one()
        to_user = session.query(UserAccount).filter_by(id=to_user_id).with_for_update().one()
        if from_user.balance < amount:
            raise ValueError("余额不足")
        from_user.balance -= amount
        to_user.balance += amount
        # 提交事务
        session.commit()
        print("转账成功")
    except SQLAlchemyError as e:
        session.rollback()
        print(f"事务回滚: {e}")
    except ValueError as e:
        session.rollback()
        print(f"业务检查失败: {e}")
    finally:
        session.close()

更推荐的方式:使用 session.begin() 上下文管理器

def transfer_orm_v2(from_user_id, to_user_id, amount):
    session = Session()
    with session.begin():
        # 如果这里抛出异常,自动 rollback
        from_user = session.query(UserAccount).filter_by(id=from_user_id).with_for_update().one()
        to_user = session.query(UserAccount).filter_by(id=to_user_id).with_for_update().one()
        if from_user.balance < amount:
            raise ValueError("余额不足")
        from_user.balance -= amount
        to_user.balance += amount
    # 正常退出上下文会自动 commit

事务处理的核心要点

概念 说明
ACID 原子性、一致性、隔离性、持久性
自动提交 部分数据库默认每条 SQL 单独提交,需要显式关闭
回滚 发生任何异常时执行 rollback(),避免脏数据
隔离级别 根据场景选择 READ COMMITTEDREPEATABLE READSERIALIZABLE
SELECT ... FOR UPDATE 防止并发修改
嵌套事务 大多数 Python DB API 不支持原生嵌套,通常通过保存点模拟

常见错误与避免

  1. 忘记提交或回滚 → 事务长时间挂起,导致锁或连接池耗尽。
  2. 捕获了所有异常却不回滚 → 数据可能部分提交。
  3. 在循环中频繁提交 → 失去事务的原子性,应批量提交。
  4. 未使用 withtry-finally → 异常时连接可能泄露。

希望这些案例能帮助你正确处理 Python 中的数据库事务,如果你有具体的数据库类型或框架(如 Django、Tortoise-ORM),也可以进一步给出针对性代码。

抱歉,评论功能暂时关闭!