如何用实用脚本批量修改数据库记录?

wen 实用脚本 1

本文目录导读:

如何用实用脚本批量修改数据库记录?

  1. MySQL批量更新脚本
  2. Python批量更新脚本
  3. Shell脚本批量处理
  4. 批量更新工具函数
  5. 性能优化建议
  6. 安全防护措施

MySQL批量更新脚本

基础批量更新

-- 根据条件批量更新
UPDATE users 
SET status = 'active', updated_at = NOW()
WHERE last_login < DATE_SUB(NOW(), INTERVAL 30 DAY);
-- 使用CASE WHEN进行条件更新
UPDATE products 
SET price = CASE 
    WHEN category = '电子' THEN price * 0.9
    WHEN category = '服装' THEN price * 0.8
    ELSE price * 0.95
END;

基于另一张表更新

-- 根据关联表数据更新
UPDATE orders o
JOIN users u ON o.user_id = u.id
SET o.discount = u.member_level * 0.1
WHERE u.member_level > 0;

Python批量更新脚本

import pymysql
from datetime import datetime, timedelta
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
class BatchUpdater:
    def __init__(self, host, user, password, database):
        self.connection = pymysql.connect(
            host=host,
            user=user,
            password=password,
            database=database,
            cursorclass=pymysql.cursors.DictCursor
        )
    def batch_update_with_progress(self, sql, params_list, batch_size=100):
        """批量更新并显示进度"""
        cursor = self.connection.cursor()
        total = len(params_list)
        success = 0
        failed = 0
        for i in range(0, total, batch_size):
            batch = params_list[i:i+batch_size]
            try:
                cursor.executemany(sql, batch)
                self.connection.commit()
                success += len(batch)
                progress = (i + len(batch)) / total * 100
                logging.info(f"进度: {progress:.1f}% - 已更新 {success} 条记录")
            except Exception as e:
                self.connection.rollback()
                failed += len(batch)
                logging.error(f"批次更新失败: {e}")
        logging.info(f"完成! 成功: {success}, 失败: {failed}")
        cursor.close()
    def close(self):
        self.connection.close()
# 使用示例
def update_user_ages():
    updater = BatchUpdater(
        host='localhost',
        user='root',
        password='password',
        database='test_db'
    )
    # 准备更新数据
    updates = [
        ('user@example.com', 25),
        ('user2@example.com', 30),
        # ... 更多数据
    ]
    sql = "UPDATE users SET age = %s WHERE email = %s"
    updater.batch_update_with_progress(sql, updates)
    updater.close()

Shell脚本批量处理

#!/bin/bash
# MySQL批量更新脚本
MYSQL_USER="root"
MYSQL_PASSWORD="password"
MYSQL_DATABASE="test_db"
# 从CSV文件批量更新
update_from_csv() {
    local csv_file=$1
    local table_name=$2
    while IFS=',' read -r id new_value; do
        # 跳过标题行
        [[ $id == "id" ]] && continue
        mysql -u$MYSQL_USER -p$MYSQL_PASSWORD $MYSQL_DATABASE -e "
            UPDATE $table_name 
            SET column_name = '$new_value' 
            WHERE id = $id;
        "
        echo "已更新 ID: $id"
    done < "$csv_file"
}
# 批量修改特定模式的数据
batch_update_pattern() {
    local search_pattern=$1
    local replace_pattern=$2
    mysql -u$MYSQL_USER -p$MYSQL_PASSWORD $MYSQL_DATABASE -e "
        UPDATE products 
        SET name = REPLACE(name, '$search_pattern', '$replace_pattern')
        WHERE name LIKE '%$search_pattern%';
    "
}
# 分批次更新大量数据
batch_update_large_data() {
    local batch_size=1000
    local offset=0
    while true; do
        result=$(mysql -u$MYSQL_USER -p$MYSQL_PASSWORD $MYSQL_DATABASE -e "
            UPDATE orders 
            SET status = 'archived' 
            WHERE created_at < '2023-01-01' 
            AND status = 'completed'
            LIMIT $batch_size;
        " 2>&1)
        rows_affected=$(echo "$result" | grep -oP 'Rows matched: \K\d+')
        if [[ $rows_affected -eq 0 ]]; then
            echo "更新完成"
            break
        fi
        echo "已更新 $offset ~ $((offset + rows_affected)) 条记录"
        offset=$((offset + rows_affected))
        sleep 0.1  # 避免数据库压力过大
    done
}
# 主函数
main() {
    case "$1" in
        csv)
            update_from_csv "$2" "$3"
            ;;
        pattern)
            batch_update_pattern "$2" "$3"
            ;;
        large)
            batch_update_large_data
            ;;
        *)
            echo "用法: $0 {csv|pattern|large} [参数...]"
            exit 1
    esac
}
main "$@"

批量更新工具函数

Python工具函数

import pandas as pd
from sqlalchemy import create_engine, text
def smart_batch_update(engine, table, updates_df, key_column, batch_size=1000):
    """
    智能批量更新 - 使用DataFrame
    """
    with engine.connect() as conn:
        transaction = conn.begin()
        try:
            for i in range(0, len(updates_df), batch_size):
                batch = updates_df.iloc[i:i+batch_size]
                # 构建批量UPDATE语句
                values = []
                for _, row in batch.iterrows():
                    set_clauses = []
                    for col in batch.columns:
                        if col != key_column:
                            set_clauses.append(f"{col} = :{col}_{i}")
                    update_sql = f"""
                        UPDATE {table}
                        SET {', '.join(set_clauses)}
                        WHERE {key_column} = :{key_column}_{i}
                    """
                    values.append(row.to_dict())
                # 执行更新
                result = conn.execute(text(update_sql), values)
                print(f"批次 {i//batch_size + 1}: 更新 {result.rowcount} 行")
            transaction.commit()
            print("批量更新完成")
        except Exception as e:
            transaction.rollback()
            print(f"更新失败: {e}")
# 使用示例
def main():
    engine = create_engine('mysql+pymysql://user:password@localhost/db')
    # 从Excel文件读取更新数据
    df = pd.read_excel('updates.xlsx')
    # 执行批量更新
    smart_batch_update(engine, 'users', df, 'id')

性能优化建议

# 1. 使用预处理语句
def efficient_batch_update(cursor, table, records):
    """使用预处理语句提高性能"""
    placeholders = ', '.join(['%s'] * len(records[0]))
    columns = ', '.join(records[0].keys())
    sql = f"UPDATE {table} SET "
    set_clause = ', '.join([f"{col} = VALUES({col})" for col in records[0].keys()])
    # 批量处理
    cursor.executemany(f"INSERT INTO {table} ({columns}) VALUES ({placeholders}) ON DUPLICATE KEY UPDATE {set_clause}", 
                      [list(r.values()) for r in records])
# 2. 分批处理大表
def batch_process_large_table(table, batch_size=10000):
    """分批处理大表避免锁表"""
    min_id = 0
    while True:
        sql = f"""
            UPDATE {table}
            SET status = 'processed'
            WHERE id > {min_id} 
            AND status = 'pending'
            ORDER BY id
            LIMIT {batch_size}
        """
        affected = cursor.execute(sql)
        if affected == 0:
            break
        min_id = cursor.lastrowid
        connection.commit()

安全防护措施

# 1. 备份数据
def backup_before_update(connection, table, condition):
    """更新前备份数据"""
    cursor = connection.cursor()
    backup_table = f"{table}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
    cursor.execute(f"CREATE TABLE {backup_table} AS SELECT * FROM {table} WHERE {condition}")
    logging.info(f"已备份到 {backup_table}")
    return backup_table
# 2. 事务回滚
def safe_batch_update(connection, sql, params_list, batch_size=100):
    """安全的批量更新,支持回滚"""
    cursor = connection.cursor()
    try:
        connection.begin()
        for i in range(0, len(params_list), batch_size):
            batch = params_list[i:i+batch_size]
            cursor.executemany(sql, batch)
        # 确认更新
        confirm = input(f"即将更新 {len(params_list)} 条记录,确认? (y/n): ")
        if confirm.lower() == 'y':
            connection.commit()
            logging.info("更新已提交")
        else:
            connection.rollback()
            logging.info("更新已回滚")
    except Exception as e:
        connection.rollback()
        logging.error(f"更新失败: {e}")
        raise
    finally:
        cursor.close()

这些脚本可以根据你的具体需求进行调整,使用时请记得:

  • 始终在生产环境前测试
  • 做好数据备份
  • 分批处理避免数据库压力过大
  • 监控执行进度和错误

抱歉,评论功能暂时关闭!