Python案例如何实现数据迁移?

wen python案例 28

本文目录导读:

Python案例如何实现数据迁移?

  1. 数据库到数据库迁移
  2. CSV/Excel文件到数据库
  3. API到数据库迁移
  4. 实时数据同步
  5. 最佳实践建议
  6. 完整迁移示例

我来介绍Python实现数据迁移的几种常见案例和方法。

数据库到数据库迁移

1 MySQL到PostgreSQL迁移

import pymysql
import psycopg2
from psycopg2.extras import RealDictCursor
class MySQLtoPostgreSQL:
    def __init__(self, mysql_config, pg_config):
        self.mysql_config = mysql_config
        self.pg_config = pg_config
    def connect_mysql(self):
        """连接MySQL"""
        return pymysql.connect(
            host=self.mysql_config['host'],
            user=self.mysql_config['user'],
            password=self.mysql_config['password'],
            database=self.mysql_config['database'],
            charset='utf8mb4'
        )
    def connect_postgresql(self):
        """连接PostgreSQL"""
        return psycopg2.connect(
            host=self.pg_config['host'],
            user=self.pg_config['user'],
            password=self.pg_config['password'],
            database=self.pg_config['database']
        )
    def migrate_table(self, table_name, batch_size=1000):
        """迁移单个表"""
        mysql_conn = self.connect_mysql()
        pg_conn = self.connect_postgresql()
        try:
            mysql_cursor = mysql_conn.cursor(pymysql.cursors.DictCursor)
            pg_cursor = pg_conn.cursor()
            # 获取表结构
            mysql_cursor.execute(f"DESCRIBE {table_name}")
            columns = [col['Field'] for col in mysql_cursor.fetchall()]
            # 清空目标表
            pg_cursor.execute(f"TRUNCATE TABLE {table_name}")
            # 分批迁移
            offset = 0
            while True:
                mysql_cursor.execute(
                    f"SELECT * FROM {table_name} LIMIT {batch_size} OFFSET {offset}"
                )
                rows = mysql_cursor.fetchall()
                if not rows:
                    break
                # 构建插入语句
                placeholders = ','.join(['%s'] * len(columns))
                columns_str = ','.join(columns)
                insert_sql = f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders})"
                # 批量插入
                for row in rows:
                    values = [row[col] for col in columns]
                    pg_cursor.execute(insert_sql, values)
                pg_conn.commit()
                offset += batch_size
                print(f"已迁移 {offset} 条记录到 {table_name}")
        finally:
            mysql_conn.close()
            pg_conn.close()
# 使用示例
mysql_config = {
    'host': 'localhost',
    'user': 'root',
    'password': 'password',
    'database': 'source_db'
}
pg_config = {
    'host': 'localhost',
    'user': 'postgres',
    'password': 'password',
    'database': 'target_db'
}
migrator = MySQLtoPostgreSQL(mysql_config, pg_config)
migrator.migrate_table('users')

CSV/Excel文件到数据库

1 CSV导入到MySQL

import pandas as pd
from sqlalchemy import create_engine
import logging
class CSVtoDatabase:
    def __init__(self, connection_string):
        self.engine = create_engine(connection_string)
        logging.basicConfig(level=logging.INFO)
        self.logger = logging.getLogger(__name__)
    def csv_to_mysql(self, csv_file, table_name, chunksize=10000):
        """CSV文件导入到MySQL"""
        try:
            # 读取CSV文件(分块处理)
            for chunk in pd.read_csv(csv_file, chunksize=chunksize):
                # 写入数据库
                chunk.to_sql(
                    name=table_name,
                    con=self.engine,
                    if_exists='append',
                    index=False,
                    method='multi'
                )
                self.logger.info(f"已写入 {len(chunk)} 条记录到 {table_name}")
            self.logger.info(f"CSV文件 {csv_file} 成功导入到 {table_name}")
        except Exception as e:
            self.logger.error(f"导入失败: {e}")
            raise
    def excel_to_database(self, excel_file, target_db='mysql'):
        """Excel文件导入到数据库"""
        # 读取Excel所有sheet
        excel_data = pd.read_excel(excel_file, sheet_name=None)
        for sheet_name, df in excel_data.items():
            # 清洗数据
            df = self.clean_data(df)
            # 写入数据库
            df.to_sql(
                name=sheet_name,
                con=self.engine,
                if_exists='replace',
                index=False
            )
            self.logger.info(f"Sheet {sheet_name} 已导入,共 {len(df)} 条记录")
    def clean_data(self, df):
        """数据清洗"""
        # 处理空值
        df = df.fillna('')
        # 去除重复列名
        df.columns = df.columns.str.replace('[^a-zA-Z0-9_]', '_')
        # 去除前后空格
        df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
        return df
# 使用示例
db_connection = 'mysql+pymysql://root:password@localhost:3306/target_db'
importer = CSVtoDatabase(db_connection)
# 导入CSV
importer.csv_to_mysql('data.csv', 'users')
# 导入Excel
importer.excel_to_database('data.xlsx')

API到数据库迁移

1 REST API数据同步

import requests
import json
from datetime import datetime, timedelta
import sqlite3
from typing import List, Dict
import time
class APIToDatabase:
    def __init__(self, api_base_url, db_path='migration.db'):
        self.api_base_url = api_base_url
        self.db_path = db_path
        self.session = requests.Session()
    def init_database(self):
        """初始化数据库表结构"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS migration_log (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                api_endpoint TEXT,
                records_imported INTEGER,
                start_time TIMESTAMP,
                end_time TIMESTAMP,
                status TEXT
            )
        ''')
        conn.commit()
        conn.close()
    def fetch_paginated_data(self, endpoint: str, page_size: int = 100) -> List[Dict]:
        """分页获取API数据"""
        all_data = []
        page = 1
        while True:
            try:
                params = {
                    'page': page,
                    'size': page_size
                }
                response = self.session.get(
                    f"{self.api_base_url}/{endpoint}",
                    params=params,
                    timeout=30
                )
                response.raise_for_status()
                data = response.json()
                # 假设API返回格式为 {data: [...], total: ..., page: ...}
                items = data.get('data', data)
                all_data.extend(items)
                # 检查是否还有更多数据
                if len(items) < page_size:
                    break
                page += 1
                time.sleep(0.5)  # 避免请求过快
            except requests.exceptions.RequestException as e:
                print(f"请求失败: {e}")
                break
        return all_data
    def migrate_api_data(self, endpoint: str, table_name: str = None):
        """迁移API数据到数据库"""
        self.init_database()
        start_time = datetime.now()
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        try:
            # 获取数据
            data = self.fetch_paginated_data(endpoint)
            if not table_name:
                table_name = endpoint.replace('/', '_')
            # 创建表(动态创建)
            if data:
                columns = list(data[0].keys())
                columns_sql = ', '.join([f"{col} TEXT" for col in columns])
                cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
                cursor.execute(f"CREATE TABLE {table_name} ({columns_sql})")
                # 插入数据
                placeholders = ', '.join(['?' for _ in columns])
                insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
                for item in data:
                    values = [str(item.get(col, '')) for col in columns]
                    cursor.execute(insert_sql, values)
                conn.commit()
            # 记录日志
            end_time = datetime.now()
            cursor.execute('''
                INSERT INTO migration_log 
                (api_endpoint, records_imported, start_time, end_time, status)
                VALUES (?, ?, ?, ?, ?)
            ''', (endpoint, len(data), start_time, end_time, 'success'))
            conn.commit()
            print(f"迁移完成: {endpoint} -> {len(data)} 条记录")
        except Exception as e:
            # 记录失败日志
            cursor.execute('''
                INSERT INTO migration_log 
                (api_endpoint, records_imported, start_time, end_time, status)
                VALUES (?, ?, ?, ?, ?)
            ''', (endpoint, 0, start_time, datetime.now(), f'failed: {str(e)}'))
            conn.commit()
            print(f"迁移失败: {e}")
        finally:
            conn.close()
# 使用示例
migrator = APIToDatabase('https://api.example.com/v1')
migrator.migrate_api_data('users')
migrator.migrate_api_data('orders')

实时数据同步

1 使用Redis作为中间缓存

import redis
import json
from concurrent.futures import ThreadPoolExecutor
import threading
import time
class RealTimeSync:
    def __init__(self, source_config, target_config):
        self.source_redis = redis.StrictRedis(**source_config)
        self.target_redis = redis.StrictRedis(**target_config)
        self.running = True
        self.executor = ThreadPoolExecutor(max_workers=4)
    def start_sync(self, source_key, target_key, batch_size=100):
        """启动实时同步"""
        print(f"开始同步 {source_key} -> {target_key}")
        while self.running:
            try:
                # 批量获取数据
                pipeline = self.source_redis.pipeline()
                pipeline.lrange(source_key, 0, batch_size - 1)
                pipeline.ltrim(source_key, batch_size, -1)
                result = pipeline.execute()
                items = result[0]
                if items:
                    # 异步写入目标
                    self.executor.submit(
                        self.write_to_target, 
                        items, 
                        target_key
                    )
                time.sleep(0.1)
            except Exception as e:
                print(f"同步错误: {e}")
                time.sleep(1)
    def write_to_target(self, items, target_key):
        """写入目标Redis"""
        try:
            pipeline = self.target_redis.pipeline()
            for item in items:
                data = json.loads(item)
                # 可以在这里进行数据转换
                pipeline.rpush(target_key, json.dumps(data))
            pipeline.execute()
            print(f"已同步 {len(items)} 条数据")
        except Exception as e:
            print(f"写入目标失败: {e}")
            # 失败时重新放入源队列
            for item in items:
                self.source_redis.lpush(self.source_key, item)
    def stop(self):
        """停止同步"""
        self.running = False
        self.executor.shutdown(wait=True)
# 使用示例
sync = RealTimeSync(
    {'host': 'source-redis', 'port': 6379},
    {'host': 'target-redis', 'port': 6379}
)
try:
    sync.start_sync('source_queue', 'target_queue')
except KeyboardInterrupt:
    sync.stop()

最佳实践建议

1 错误处理和重试机制

from functools import wraps
import time
import random
def retry(max_attempts=3, delay=1, backoff=2):
    """重试装饰器"""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            attempt = 0
            while attempt < max_attempts:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    attempt += 1
                    if attempt == max_attempts:
                        raise e
                    wait_time = delay * (backoff ** attempt) + random.uniform(0, 1)
                    print(f"重试 {attempt}: {e}, 等待 {wait_time:.2f}秒")
                    time.sleep(wait_time)
        return wrapper
    return decorator
@retry(max_attempts=3)
def migrate_with_retry():
    # 迁移逻辑
    pass

2 配置管理

import yaml
import os
class ConfigManager:
    def __init__(self, config_path='config.yaml'):
        with open(config_path, 'r') as f:
            self.config = yaml.safe_load(f)
    def get_source_config(self):
        return self.config.get('source', {})
    def get_target_config(self):
        return self.config.get('target', {})
    def get_migration_rules(self):
        return self.config.get('migration_rules', [])

3 监控和日志

import logging
from datetime import datetime
class MigrationMonitor:
    def __init__(self, log_file='migration.log'):
        logging.basicConfig(
            filename=log_file,
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s'
        )
        self.stats = {
            'total_records': 0,
            'success_records': 0,
            'failed_records': 0,
            'start_time': datetime.now()
        }
    def record_progress(self, success=True, count=1):
        if success:
            self.stats['success_records'] += count
        else:
            self.stats['failed_records'] += count
        self.stats['total_records'] += count
    def print_summary(self):
        duration = datetime.now() - self.stats['start_time']
        print(f"""
        迁移完成!
        总记录数: {self.stats['total_records']}
        成功: {self.stats['success_records']}
        失败: {self.stats['failed_records']}
        耗时: {duration}
        """)

完整迁移示例

def main():
    # 配置文件
    config = {
        'source': {
            'type': 'mysql',
            'host': 'source_host',
            'database': 'source_db'
        },
        'target': {
            'type': 'postgresql',
            'host': 'target_host', 
            'database': 'target_db'
        },
        'tables': ['users', 'orders', 'products']
    }
    monitor = MigrationMonitor()
    for table in config['tables']:
        try:
            print(f"开始迁移表: {table}")
            # 执行迁移
            migrator = MySQLtoPostgreSQL(
                config['source'],
                config['target']
            )
            migrator.migrate_table(table)
            monitor.record_progress(success=True, count=1000)
            print(f"表 {table} 迁移成功")
        except Exception as e:
            monitor.record_progress(success=False)
            print(f"表 {table} 迁移失败: {e}")
    monitor.print_summary()
if __name__ == '__main__':
    main()

这些案例涵盖了常见的数据迁移场景,你可以根据实际需求选择合适的方案,关键是要注意:

  1. 数据一致性:确保迁移过程中数据不丢失
  2. 性能优化:使用批量操作和分页处理
  3. 错误处理:实现重试机制和异常处理
  4. 监控日志:记录迁移过程和结果

抱歉,评论功能暂时关闭!