本文目录导读:

我来介绍Python实现数据迁移的几种常见案例和方法。
数据库到数据库迁移
1 MySQL到PostgreSQL迁移
import pymysql
import psycopg2
from psycopg2.extras import RealDictCursor
class MySQLtoPostgreSQL:
def __init__(self, mysql_config, pg_config):
self.mysql_config = mysql_config
self.pg_config = pg_config
def connect_mysql(self):
"""连接MySQL"""
return pymysql.connect(
host=self.mysql_config['host'],
user=self.mysql_config['user'],
password=self.mysql_config['password'],
database=self.mysql_config['database'],
charset='utf8mb4'
)
def connect_postgresql(self):
"""连接PostgreSQL"""
return psycopg2.connect(
host=self.pg_config['host'],
user=self.pg_config['user'],
password=self.pg_config['password'],
database=self.pg_config['database']
)
def migrate_table(self, table_name, batch_size=1000):
"""迁移单个表"""
mysql_conn = self.connect_mysql()
pg_conn = self.connect_postgresql()
try:
mysql_cursor = mysql_conn.cursor(pymysql.cursors.DictCursor)
pg_cursor = pg_conn.cursor()
# 获取表结构
mysql_cursor.execute(f"DESCRIBE {table_name}")
columns = [col['Field'] for col in mysql_cursor.fetchall()]
# 清空目标表
pg_cursor.execute(f"TRUNCATE TABLE {table_name}")
# 分批迁移
offset = 0
while True:
mysql_cursor.execute(
f"SELECT * FROM {table_name} LIMIT {batch_size} OFFSET {offset}"
)
rows = mysql_cursor.fetchall()
if not rows:
break
# 构建插入语句
placeholders = ','.join(['%s'] * len(columns))
columns_str = ','.join(columns)
insert_sql = f"INSERT INTO {table_name} ({columns_str}) VALUES ({placeholders})"
# 批量插入
for row in rows:
values = [row[col] for col in columns]
pg_cursor.execute(insert_sql, values)
pg_conn.commit()
offset += batch_size
print(f"已迁移 {offset} 条记录到 {table_name}")
finally:
mysql_conn.close()
pg_conn.close()
# 使用示例
mysql_config = {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'source_db'
}
pg_config = {
'host': 'localhost',
'user': 'postgres',
'password': 'password',
'database': 'target_db'
}
migrator = MySQLtoPostgreSQL(mysql_config, pg_config)
migrator.migrate_table('users')
CSV/Excel文件到数据库
1 CSV导入到MySQL
import pandas as pd
from sqlalchemy import create_engine
import logging
class CSVtoDatabase:
def __init__(self, connection_string):
self.engine = create_engine(connection_string)
logging.basicConfig(level=logging.INFO)
self.logger = logging.getLogger(__name__)
def csv_to_mysql(self, csv_file, table_name, chunksize=10000):
"""CSV文件导入到MySQL"""
try:
# 读取CSV文件(分块处理)
for chunk in pd.read_csv(csv_file, chunksize=chunksize):
# 写入数据库
chunk.to_sql(
name=table_name,
con=self.engine,
if_exists='append',
index=False,
method='multi'
)
self.logger.info(f"已写入 {len(chunk)} 条记录到 {table_name}")
self.logger.info(f"CSV文件 {csv_file} 成功导入到 {table_name}")
except Exception as e:
self.logger.error(f"导入失败: {e}")
raise
def excel_to_database(self, excel_file, target_db='mysql'):
"""Excel文件导入到数据库"""
# 读取Excel所有sheet
excel_data = pd.read_excel(excel_file, sheet_name=None)
for sheet_name, df in excel_data.items():
# 清洗数据
df = self.clean_data(df)
# 写入数据库
df.to_sql(
name=sheet_name,
con=self.engine,
if_exists='replace',
index=False
)
self.logger.info(f"Sheet {sheet_name} 已导入,共 {len(df)} 条记录")
def clean_data(self, df):
"""数据清洗"""
# 处理空值
df = df.fillna('')
# 去除重复列名
df.columns = df.columns.str.replace('[^a-zA-Z0-9_]', '_')
# 去除前后空格
df = df.apply(lambda x: x.str.strip() if x.dtype == "object" else x)
return df
# 使用示例
db_connection = 'mysql+pymysql://root:password@localhost:3306/target_db'
importer = CSVtoDatabase(db_connection)
# 导入CSV
importer.csv_to_mysql('data.csv', 'users')
# 导入Excel
importer.excel_to_database('data.xlsx')
API到数据库迁移
1 REST API数据同步
import requests
import json
from datetime import datetime, timedelta
import sqlite3
from typing import List, Dict
import time
class APIToDatabase:
def __init__(self, api_base_url, db_path='migration.db'):
self.api_base_url = api_base_url
self.db_path = db_path
self.session = requests.Session()
def init_database(self):
"""初始化数据库表结构"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS migration_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
api_endpoint TEXT,
records_imported INTEGER,
start_time TIMESTAMP,
end_time TIMESTAMP,
status TEXT
)
''')
conn.commit()
conn.close()
def fetch_paginated_data(self, endpoint: str, page_size: int = 100) -> List[Dict]:
"""分页获取API数据"""
all_data = []
page = 1
while True:
try:
params = {
'page': page,
'size': page_size
}
response = self.session.get(
f"{self.api_base_url}/{endpoint}",
params=params,
timeout=30
)
response.raise_for_status()
data = response.json()
# 假设API返回格式为 {data: [...], total: ..., page: ...}
items = data.get('data', data)
all_data.extend(items)
# 检查是否还有更多数据
if len(items) < page_size:
break
page += 1
time.sleep(0.5) # 避免请求过快
except requests.exceptions.RequestException as e:
print(f"请求失败: {e}")
break
return all_data
def migrate_api_data(self, endpoint: str, table_name: str = None):
"""迁移API数据到数据库"""
self.init_database()
start_time = datetime.now()
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
try:
# 获取数据
data = self.fetch_paginated_data(endpoint)
if not table_name:
table_name = endpoint.replace('/', '_')
# 创建表(动态创建)
if data:
columns = list(data[0].keys())
columns_sql = ', '.join([f"{col} TEXT" for col in columns])
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
cursor.execute(f"CREATE TABLE {table_name} ({columns_sql})")
# 插入数据
placeholders = ', '.join(['?' for _ in columns])
insert_sql = f"INSERT INTO {table_name} ({', '.join(columns)}) VALUES ({placeholders})"
for item in data:
values = [str(item.get(col, '')) for col in columns]
cursor.execute(insert_sql, values)
conn.commit()
# 记录日志
end_time = datetime.now()
cursor.execute('''
INSERT INTO migration_log
(api_endpoint, records_imported, start_time, end_time, status)
VALUES (?, ?, ?, ?, ?)
''', (endpoint, len(data), start_time, end_time, 'success'))
conn.commit()
print(f"迁移完成: {endpoint} -> {len(data)} 条记录")
except Exception as e:
# 记录失败日志
cursor.execute('''
INSERT INTO migration_log
(api_endpoint, records_imported, start_time, end_time, status)
VALUES (?, ?, ?, ?, ?)
''', (endpoint, 0, start_time, datetime.now(), f'failed: {str(e)}'))
conn.commit()
print(f"迁移失败: {e}")
finally:
conn.close()
# 使用示例
migrator = APIToDatabase('https://api.example.com/v1')
migrator.migrate_api_data('users')
migrator.migrate_api_data('orders')
实时数据同步
1 使用Redis作为中间缓存
import redis
import json
from concurrent.futures import ThreadPoolExecutor
import threading
import time
class RealTimeSync:
def __init__(self, source_config, target_config):
self.source_redis = redis.StrictRedis(**source_config)
self.target_redis = redis.StrictRedis(**target_config)
self.running = True
self.executor = ThreadPoolExecutor(max_workers=4)
def start_sync(self, source_key, target_key, batch_size=100):
"""启动实时同步"""
print(f"开始同步 {source_key} -> {target_key}")
while self.running:
try:
# 批量获取数据
pipeline = self.source_redis.pipeline()
pipeline.lrange(source_key, 0, batch_size - 1)
pipeline.ltrim(source_key, batch_size, -1)
result = pipeline.execute()
items = result[0]
if items:
# 异步写入目标
self.executor.submit(
self.write_to_target,
items,
target_key
)
time.sleep(0.1)
except Exception as e:
print(f"同步错误: {e}")
time.sleep(1)
def write_to_target(self, items, target_key):
"""写入目标Redis"""
try:
pipeline = self.target_redis.pipeline()
for item in items:
data = json.loads(item)
# 可以在这里进行数据转换
pipeline.rpush(target_key, json.dumps(data))
pipeline.execute()
print(f"已同步 {len(items)} 条数据")
except Exception as e:
print(f"写入目标失败: {e}")
# 失败时重新放入源队列
for item in items:
self.source_redis.lpush(self.source_key, item)
def stop(self):
"""停止同步"""
self.running = False
self.executor.shutdown(wait=True)
# 使用示例
sync = RealTimeSync(
{'host': 'source-redis', 'port': 6379},
{'host': 'target-redis', 'port': 6379}
)
try:
sync.start_sync('source_queue', 'target_queue')
except KeyboardInterrupt:
sync.stop()
最佳实践建议
1 错误处理和重试机制
from functools import wraps
import time
import random
def retry(max_attempts=3, delay=1, backoff=2):
"""重试装饰器"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
attempt = 0
while attempt < max_attempts:
try:
return func(*args, **kwargs)
except Exception as e:
attempt += 1
if attempt == max_attempts:
raise e
wait_time = delay * (backoff ** attempt) + random.uniform(0, 1)
print(f"重试 {attempt}: {e}, 等待 {wait_time:.2f}秒")
time.sleep(wait_time)
return wrapper
return decorator
@retry(max_attempts=3)
def migrate_with_retry():
# 迁移逻辑
pass
2 配置管理
import yaml
import os
class ConfigManager:
def __init__(self, config_path='config.yaml'):
with open(config_path, 'r') as f:
self.config = yaml.safe_load(f)
def get_source_config(self):
return self.config.get('source', {})
def get_target_config(self):
return self.config.get('target', {})
def get_migration_rules(self):
return self.config.get('migration_rules', [])
3 监控和日志
import logging
from datetime import datetime
class MigrationMonitor:
def __init__(self, log_file='migration.log'):
logging.basicConfig(
filename=log_file,
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
self.stats = {
'total_records': 0,
'success_records': 0,
'failed_records': 0,
'start_time': datetime.now()
}
def record_progress(self, success=True, count=1):
if success:
self.stats['success_records'] += count
else:
self.stats['failed_records'] += count
self.stats['total_records'] += count
def print_summary(self):
duration = datetime.now() - self.stats['start_time']
print(f"""
迁移完成!
总记录数: {self.stats['total_records']}
成功: {self.stats['success_records']}
失败: {self.stats['failed_records']}
耗时: {duration}
""")
完整迁移示例
def main():
# 配置文件
config = {
'source': {
'type': 'mysql',
'host': 'source_host',
'database': 'source_db'
},
'target': {
'type': 'postgresql',
'host': 'target_host',
'database': 'target_db'
},
'tables': ['users', 'orders', 'products']
}
monitor = MigrationMonitor()
for table in config['tables']:
try:
print(f"开始迁移表: {table}")
# 执行迁移
migrator = MySQLtoPostgreSQL(
config['source'],
config['target']
)
migrator.migrate_table(table)
monitor.record_progress(success=True, count=1000)
print(f"表 {table} 迁移成功")
except Exception as e:
monitor.record_progress(success=False)
print(f"表 {table} 迁移失败: {e}")
monitor.print_summary()
if __name__ == '__main__':
main()
这些案例涵盖了常见的数据迁移场景,你可以根据实际需求选择合适的方案,关键是要注意:
- 数据一致性:确保迁移过程中数据不丢失
- 性能优化:使用批量操作和分页处理
- 错误处理:实现重试机制和异常处理
- 监控日志:记录迁移过程和结果