本文目录导读:

MySQL批量更新脚本
基础批量更新
-- 根据条件批量更新
UPDATE users
SET status = 'active', updated_at = NOW()
WHERE last_login < DATE_SUB(NOW(), INTERVAL 30 DAY);
-- 使用CASE WHEN进行条件更新
UPDATE products
SET price = CASE
WHEN category = '电子' THEN price * 0.9
WHEN category = '服装' THEN price * 0.8
ELSE price * 0.95
END;
基于另一张表更新
-- 根据关联表数据更新 UPDATE orders o JOIN users u ON o.user_id = u.id SET o.discount = u.member_level * 0.1 WHERE u.member_level > 0;
Python批量更新脚本
import pymysql
from datetime import datetime, timedelta
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
class BatchUpdater:
def __init__(self, host, user, password, database):
self.connection = pymysql.connect(
host=host,
user=user,
password=password,
database=database,
cursorclass=pymysql.cursors.DictCursor
)
def batch_update_with_progress(self, sql, params_list, batch_size=100):
"""批量更新并显示进度"""
cursor = self.connection.cursor()
total = len(params_list)
success = 0
failed = 0
for i in range(0, total, batch_size):
batch = params_list[i:i+batch_size]
try:
cursor.executemany(sql, batch)
self.connection.commit()
success += len(batch)
progress = (i + len(batch)) / total * 100
logging.info(f"进度: {progress:.1f}% - 已更新 {success} 条记录")
except Exception as e:
self.connection.rollback()
failed += len(batch)
logging.error(f"批次更新失败: {e}")
logging.info(f"完成! 成功: {success}, 失败: {failed}")
cursor.close()
def close(self):
self.connection.close()
# 使用示例
def update_user_ages():
updater = BatchUpdater(
host='localhost',
user='root',
password='password',
database='test_db'
)
# 准备更新数据
updates = [
('user@example.com', 25),
('user2@example.com', 30),
# ... 更多数据
]
sql = "UPDATE users SET age = %s WHERE email = %s"
updater.batch_update_with_progress(sql, updates)
updater.close()
Shell脚本批量处理
#!/bin/bash
# MySQL批量更新脚本
MYSQL_USER="root"
MYSQL_PASSWORD="password"
MYSQL_DATABASE="test_db"
# 从CSV文件批量更新
update_from_csv() {
local csv_file=$1
local table_name=$2
while IFS=',' read -r id new_value; do
# 跳过标题行
[[ $id == "id" ]] && continue
mysql -u$MYSQL_USER -p$MYSQL_PASSWORD $MYSQL_DATABASE -e "
UPDATE $table_name
SET column_name = '$new_value'
WHERE id = $id;
"
echo "已更新 ID: $id"
done < "$csv_file"
}
# 批量修改特定模式的数据
batch_update_pattern() {
local search_pattern=$1
local replace_pattern=$2
mysql -u$MYSQL_USER -p$MYSQL_PASSWORD $MYSQL_DATABASE -e "
UPDATE products
SET name = REPLACE(name, '$search_pattern', '$replace_pattern')
WHERE name LIKE '%$search_pattern%';
"
}
# 分批次更新大量数据
batch_update_large_data() {
local batch_size=1000
local offset=0
while true; do
result=$(mysql -u$MYSQL_USER -p$MYSQL_PASSWORD $MYSQL_DATABASE -e "
UPDATE orders
SET status = 'archived'
WHERE created_at < '2023-01-01'
AND status = 'completed'
LIMIT $batch_size;
" 2>&1)
rows_affected=$(echo "$result" | grep -oP 'Rows matched: \K\d+')
if [[ $rows_affected -eq 0 ]]; then
echo "更新完成"
break
fi
echo "已更新 $offset ~ $((offset + rows_affected)) 条记录"
offset=$((offset + rows_affected))
sleep 0.1 # 避免数据库压力过大
done
}
# 主函数
main() {
case "$1" in
csv)
update_from_csv "$2" "$3"
;;
pattern)
batch_update_pattern "$2" "$3"
;;
large)
batch_update_large_data
;;
*)
echo "用法: $0 {csv|pattern|large} [参数...]"
exit 1
esac
}
main "$@"
批量更新工具函数
Python工具函数
import pandas as pd
from sqlalchemy import create_engine, text
def smart_batch_update(engine, table, updates_df, key_column, batch_size=1000):
"""
智能批量更新 - 使用DataFrame
"""
with engine.connect() as conn:
transaction = conn.begin()
try:
for i in range(0, len(updates_df), batch_size):
batch = updates_df.iloc[i:i+batch_size]
# 构建批量UPDATE语句
values = []
for _, row in batch.iterrows():
set_clauses = []
for col in batch.columns:
if col != key_column:
set_clauses.append(f"{col} = :{col}_{i}")
update_sql = f"""
UPDATE {table}
SET {', '.join(set_clauses)}
WHERE {key_column} = :{key_column}_{i}
"""
values.append(row.to_dict())
# 执行更新
result = conn.execute(text(update_sql), values)
print(f"批次 {i//batch_size + 1}: 更新 {result.rowcount} 行")
transaction.commit()
print("批量更新完成")
except Exception as e:
transaction.rollback()
print(f"更新失败: {e}")
# 使用示例
def main():
engine = create_engine('mysql+pymysql://user:password@localhost/db')
# 从Excel文件读取更新数据
df = pd.read_excel('updates.xlsx')
# 执行批量更新
smart_batch_update(engine, 'users', df, 'id')
性能优化建议
# 1. 使用预处理语句
def efficient_batch_update(cursor, table, records):
"""使用预处理语句提高性能"""
placeholders = ', '.join(['%s'] * len(records[0]))
columns = ', '.join(records[0].keys())
sql = f"UPDATE {table} SET "
set_clause = ', '.join([f"{col} = VALUES({col})" for col in records[0].keys()])
# 批量处理
cursor.executemany(f"INSERT INTO {table} ({columns}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE {set_clause}",
[list(r.values()) for r in records])
# 2. 分批处理大表
def batch_process_large_table(table, batch_size=10000):
"""分批处理大表避免锁表"""
min_id = 0
while True:
sql = f"""
UPDATE {table}
SET status = 'processed'
WHERE id > {min_id}
AND status = 'pending'
ORDER BY id
LIMIT {batch_size}
"""
affected = cursor.execute(sql)
if affected == 0:
break
min_id = cursor.lastrowid
connection.commit()
安全防护措施
# 1. 备份数据
def backup_before_update(connection, table, condition):
"""更新前备份数据"""
cursor = connection.cursor()
backup_table = f"{table}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
cursor.execute(f"CREATE TABLE {backup_table} AS SELECT * FROM {table} WHERE {condition}")
logging.info(f"已备份到 {backup_table}")
return backup_table
# 2. 事务回滚
def safe_batch_update(connection, sql, params_list, batch_size=100):
"""安全的批量更新,支持回滚"""
cursor = connection.cursor()
try:
connection.begin()
for i in range(0, len(params_list), batch_size):
batch = params_list[i:i+batch_size]
cursor.executemany(sql, batch)
# 确认更新
confirm = input(f"即将更新 {len(params_list)} 条记录,确认? (y/n): ")
if confirm.lower() == 'y':
connection.commit()
logging.info("更新已提交")
else:
connection.rollback()
logging.info("更新已回滚")
except Exception as e:
connection.rollback()
logging.error(f"更新失败: {e}")
raise
finally:
cursor.close()
这些脚本可以根据你的具体需求进行调整,使用时请记得:
- 始终在生产环境前测试
- 做好数据备份
- 分批处理避免数据库压力过大
- 监控执行进度和错误