Python案例如何定时同步数据?

wen python案例 77

本文目录导读:

Python案例如何定时同步数据?

  1. 使用 time.sleep() - 最简单的方式
  2. 使用 schedule 库 - 更灵活的定时任务
  3. 使用 APScheduler - 功能最强大的定时任务
  4. 完整的实际案例 - 数据同步服务
  5. 注意事项

我来分享几种Python定时同步数据的常用方法,从简单到专业。

使用 time.sleep() - 最简单的方式

import time
from datetime import datetime
def sync_data():
    """模拟数据同步函数"""
    print(f"[{datetime.now()}] 开始同步数据...")
    # 在这里写你的数据同步逻辑
    print(f"[{datetime.now()}] 数据同步完成")
def run_sync(interval_minutes=30):
    """定时执行同步"""
    while True:
        sync_data()
        print(f"等待 {interval_minutes} 分钟后再次同步...")
        time.sleep(interval_minutes * 60)  # 转换为秒
# 使用示例
run_sync(interval_minutes=30)

使用 schedule 库 - 更灵活的定时任务

import schedule
import time
from datetime import datetime
def sync_data():
    """数据同步函数"""
    print(f"[{datetime.now()}] 执行数据同步...")
    # 实际数据同步代码
    # 从API获取数据写入数据库
    pass
def backup_data():
    """数据备份函数"""
    print(f"[{datetime.now()}] 执行数据备份...")
    # 数据备份代码
    pass
# 设置定时任务
schedule.every(30).minutes.do(sync_data)           # 每30分钟同步
schedule.every().hour.do(sync_data)                 # 每小时同步
schedule.every().day.at("02:00").do(backup_data)    # 每天凌晨2点备份
schedule.every().monday.do(sync_data)               # 每周一同步
# 启动定时任务
while True:
    schedule.run_pending()
    time.sleep(1)

使用 APScheduler - 功能最强大的定时任务

from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
def sync_database_data():
    """同步数据库数据"""
    print(f"[{datetime.now()}] 开始同步数据库数据...")
    # 数据库同步代码
    print(f"[{datetime.now()}] 数据库同步完成")
def sync_api_data():
    """同步API数据"""
    print(f"[{datetime.now()}] 开始同步API数据...")
    # API数据同步代码
    print(f"[{datetime.now()}] API数据同步完成")
def cleanup_old_data():
    """清理旧数据"""
    print(f"[{datetime.now()}] 开始清理旧数据...")
    # 数据清理代码
    print(f"[{datetime.now()}] 旧数据清理完成")
# 创建调度器
scheduler = BlockingScheduler()
# 添加定时任务
scheduler.add_job(
    sync_database_data,
    'interval',
    minutes=30,  # 每30分钟执行
    id='db_sync'
)
scheduler.add_job(
    sync_api_data,
    'cron',
    hour='*/2',  # 每2小时执行
    minute=0,
    id='api_sync'
)
scheduler.add_job(
    cleanup_old_data,
    'cron',
    day_of_week='mon-fri',
    hour='3',
    minute='0',  # 工作日凌晨3点执行
    id='cleanup'
)
# 启动调度器
if __name__ == '__main__':
    print("定时任务启动...")
    scheduler.start()

完整的实际案例 - 数据同步服务

import time
import logging
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
import requests
import mysql.connector
import json
# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('sync_data.log'),
        logging.StreamHandler()
    ]
)
logger = logging.getLogger(__name__)
class DataSyncService:
    def __init__(self):
        self.scheduler = BackgroundScheduler(
            executors={
                'default': ThreadPoolExecutor(10)
            }
        )
        self.db_config = {
            'host': 'localhost',
            'user': 'root',
            'password': 'password',
            'database': 'my_database'
        }
    def sync_external_api(self):
        """同步外部API数据"""
        try:
            logger.info("开始同步外部API数据...")
            # 模拟API请求
            response = requests.get(
                'https://api.example.com/data',
                timeout=30
            )
            if response.status_code == 200:
                data = response.json()
                self.save_to_database(data)
                logger.info(f"外部API数据同步完成,同步了 {len(data)} 条记录")
            else:
                logger.error(f"API请求失败,状态码: {response.status_code}")
        except Exception as e:
            logger.error(f"外部API同步失败: {str(e)}")
    def sync_database_to_backup(self):
        """数据库备份"""
        try:
            logger.info("开始数据库备份...")
            # 连接数据库
            conn = mysql.connector.connect(**self.db_config)
            cursor = conn.cursor()
            # 执行备份
            cursor.execute("SELECT * FROM my_table")
            data = cursor.fetchall()
            # 保存备份文件
            backup_file = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
            with open(backup_file, 'w', encoding='utf-8') as f:
                json.dump(data, f, ensure_ascii=False, default=str)
            cursor.close()
            conn.close()
            logger.info(f"数据库备份完成,保存到 {backup_file}")
        except Exception as e:
            logger.error(f"数据库备份失败: {str(e)}")
    def sync_file_system(self):
        """同步文件系统"""
        try:
            logger.info("开始文件系统同步...")
            # 这里写文件同步逻辑
            # 将本地文件同步到云存储
            logger.info("文件系统同步完成")
        except Exception as e:
            logger.error(f"文件系统同步失败: {str(e)}")
    def save_to_database(self, data):
        """保存数据到数据库"""
        try:
            conn = mysql.connector.connect(**self.db_config)
            cursor = conn.cursor()
            for item in data:
                cursor.execute("""
                    INSERT INTO target_table (field1, field2, updated_at) 
                    VALUES (%s, %s, NOW())
                    ON DUPLICATE KEY UPDATE field1=VALUES(field1)
                """, (item['field1'], item['field2']))
            conn.commit()
            cursor.close()
            conn.close()
        except Exception as e:
            logger.error(f"保存数据到数据库失败: {str(e)}")
    def start(self):
        """启动所有定时任务"""
        # 不同任务设置不同频率
        self.scheduler.add_job(
            self.sync_external_api,
            'interval',
            minutes=30,
            id='api_sync',
            name='同步API数据'
        )
        self.scheduler.add_job(
            self.sync_database_to_backup,
            'cron',
            hour='3,15',
            minute='0',
            id='db_backup',
            name='数据库备份'
        )
        self.scheduler.add_job(
            self.sync_file_system,
            'interval',
            hours=1,
            id='file_sync',
            name='文件同步'
        )
        # 启动前立即执行一次
        self.sync_external_api()
        self.scheduler.start()
        logger.info("数据同步服务已启动")
        try:
            # 保持主线程运行
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            self.scheduler.shutdown()
            logger.info("数据同步服务已停止")
# 使用示例
if __name__ == '__main__':
    sync_service = DataSyncService()
    sync_service.start()

注意事项

错误处理

def safe_sync(func):
    """装饰器:安全执行同步任务"""
    def wrapper(*args, **kwargs):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            logger.error(f"同步任务 {func.__name__} 失败: {str(e)}")
            # 可以发送告警通知
            send_alert(f"同步失败: {str(e)}")
    return wrapper
@safe_sync
def critical_sync_task():
    # 关键同步任务
    pass

配置管理

# config.yaml
sync_config:
  database:
    host: localhost
    port: 3306
    user: root
    password: ${DB_PASSWORD}
  tasks:
    - name: api_sync
      type: interval
      interval: 30
      unit: minutes
      function: sync_external_api
    - name: db_backup
      type: cron
      expression: "0 3 * * *"
      function: sync_database_to_backup

建议:

  • 对于简单场景使用 schedule
  • 对于复杂业务使用 APScheduler
  • 生产环境建议使用专门的分布式任务调度系统
  • 添加重试机制和告警通知
  • 做好日志记录和监控

选择哪种方式主要取决于你的具体需求:简单定时任务用 time.sleep(),中等复杂度用 schedule,企业级应用用 APScheduler

抱歉,评论功能暂时关闭!