本文目录导读:

- 案例1:使用 sqlite3 处理事务(自动/手动)
- 案例2:使用 mysql-connector-python 处理事务
- 案例3:使用 psycopg2 处理 PostgreSQL 事务
- 案例4:使用 SQLAlchemy ORM 处理事务
- 事务处理的核心要点
- 常见错误与避免
在Python中处理数据库事务,通常使用数据库驱动库(如 sqlite3、mysql-connector-python、psycopg2 等)或 ORM 框架(如 SQLAlchemy、Django ORM),事务的核心是原子性:一组操作要么全部成功,要么全部失败回滚。
下面通过几个常见的案例来说明如何正确处理事务。
案例1:使用 sqlite3 处理事务(自动/手动)
sqlite3 默认是自动提交模式,每条 SQL 语句独立提交,要使用事务,需要显式控制 BEGIN 和 COMMIT/ROLLBACK。
import sqlite3
def transfer_funds(from_acc, to_acc, amount):
conn = sqlite3.connect('bank.db')
cursor = conn.cursor()
try:
# 手动开始事务
cursor.execute("BEGIN")
# 扣钱
cursor.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", (amount, from_acc))
# 加钱
cursor.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", (amount, to_acc))
# 提交事务
conn.commit()
print("转账成功")
except Exception as e:
# 回滚事务
conn.rollback()
print(f"转账失败,回滚事务: {e}")
finally:
conn.close()
# 使用 with 语句(推荐)
def safe_transfer(from_acc, to_acc, amount):
conn = sqlite3.connect('bank.db')
try:
# with conn 会自动在退出时 commit(无异常)或 rollback(有异常)
with conn:
conn.execute("UPDATE accounts SET balance = balance - ? WHERE id = ?", (amount, from_acc))
conn.execute("UPDATE accounts SET balance = balance + ? WHERE id = ?", (amount, to_acc))
print("转账成功")
except Exception as e:
print(f"转账失败: {e}")
finally:
conn.close()
关键点:
- 使用
conn.commit()提交,conn.rollback()回滚。 with conn:上下文管理器会自动处理事务(sqlite3特有)。
案例2:使用 mysql-connector-python 处理事务
MySQL 默认是自动提交,需要关闭自动提交或手动控制事务。
import mysql.connector
from mysql.connector import Error
def batch_insert_orders(orders_data):
conn = None
try:
conn = mysql.connector.connect(
host='localhost',
database='shop',
user='root',
password='password'
)
# 关闭自动提交
conn.autocommit = False
cursor = conn.cursor()
# 执行多条插入
for order in orders_data:
cursor.execute(
"INSERT INTO orders (user_id, product_id, quantity) VALUES (%s, %s, %s)",
(order['user_id'], order['product_id'], order['quantity'])
)
# 全部成功则提交
conn.commit()
print(f"成功插入 {len(orders_data)} 条订单")
except Error as e:
if conn:
conn.rollback()
print(f"事务回滚: {e}")
finally:
if conn and conn.is_connected():
cursor.close()
conn.close()
案例3:使用 psycopg2 处理 PostgreSQL 事务
PostgreSQL 驱动默认不是自动提交,但连接建立后处于一个隐式事务中,通常显式控制更安全。
import psycopg2
from psycopg2 import sql
def update_inventory_and_create_order(user_id, product_id, quantity):
conn = None
try:
conn = psycopg2.connect(
host='localhost',
database='store',
user='app_user',
password='secret'
)
cur = conn.cursor()
# 检查库存
cur.execute("SELECT stock FROM products WHERE id = %s FOR UPDATE", (product_id,))
stock = cur.fetchone()[0]
if stock < quantity:
raise ValueError("库存不足")
# 扣减库存
cur.execute("UPDATE products SET stock = stock - %s WHERE id = %s", (quantity, product_id))
# 创建订单
cur.execute("INSERT INTO orders (user_id, product_id, quantity) VALUES (%s, %s, %s)",
(user_id, product_id, quantity))
# 提交事务
conn.commit()
print("操作成功")
except Exception as e:
if conn:
conn.rollback()
print(f"操作失败,回滚: {e}")
finally:
if conn:
cur.close()
conn.close()
注意:FOR UPDATE 用于行级锁,防止并发时的超卖问题。
案例4:使用 SQLAlchemy ORM 处理事务
SQLAlchemy 提供了 session 对象管理事务,更上层更易用。
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
from sqlalchemy.exc import SQLAlchemyError
# 假设已定义 UserAccount 模型
from models import UserAccount, engine
Session = sessionmaker(bind=engine)
def transfer_orm(from_user_id, to_user_id, amount):
session = Session()
try:
from_user = session.query(UserAccount).filter_by(id=from_user_id).with_for_update().one()
to_user = session.query(UserAccount).filter_by(id=to_user_id).with_for_update().one()
if from_user.balance < amount:
raise ValueError("余额不足")
from_user.balance -= amount
to_user.balance += amount
# 提交事务
session.commit()
print("转账成功")
except SQLAlchemyError as e:
session.rollback()
print(f"事务回滚: {e}")
except ValueError as e:
session.rollback()
print(f"业务检查失败: {e}")
finally:
session.close()
更推荐的方式:使用 session.begin() 上下文管理器
def transfer_orm_v2(from_user_id, to_user_id, amount):
session = Session()
with session.begin():
# 如果这里抛出异常,自动 rollback
from_user = session.query(UserAccount).filter_by(id=from_user_id).with_for_update().one()
to_user = session.query(UserAccount).filter_by(id=to_user_id).with_for_update().one()
if from_user.balance < amount:
raise ValueError("余额不足")
from_user.balance -= amount
to_user.balance += amount
# 正常退出上下文会自动 commit
事务处理的核心要点
| 概念 | 说明 |
|---|---|
| ACID | 原子性、一致性、隔离性、持久性 |
| 自动提交 | 部分数据库默认每条 SQL 单独提交,需要显式关闭 |
| 回滚 | 发生任何异常时执行 rollback(),避免脏数据 |
| 隔离级别 | 根据场景选择 READ COMMITTED、REPEATABLE READ、SERIALIZABLE |
| 锁 | SELECT ... FOR UPDATE 防止并发修改 |
| 嵌套事务 | 大多数 Python DB API 不支持原生嵌套,通常通过保存点模拟 |
常见错误与避免
- 忘记提交或回滚 → 事务长时间挂起,导致锁或连接池耗尽。
- 捕获了所有异常却不回滚 → 数据可能部分提交。
- 在循环中频繁提交 → 失去事务的原子性,应批量提交。
- 未使用
with或try-finally→ 异常时连接可能泄露。
希望这些案例能帮助你正确处理 Python 中的数据库事务,如果你有具体的数据库类型或框架(如 Django、Tortoise-ORM),也可以进一步给出针对性代码。