本文目录导读:

我来分享几种Python定时同步数据的常用方法,从简单到专业。
使用 time.sleep() - 最简单的方式
import time
from datetime import datetime
def sync_data():
"""模拟数据同步函数"""
print(f"[{datetime.now()}] 开始同步数据...")
# 在这里写你的数据同步逻辑
print(f"[{datetime.now()}] 数据同步完成")
def run_sync(interval_minutes=30):
"""定时执行同步"""
while True:
sync_data()
print(f"等待 {interval_minutes} 分钟后再次同步...")
time.sleep(interval_minutes * 60) # 转换为秒
# 使用示例
run_sync(interval_minutes=30)
使用 schedule 库 - 更灵活的定时任务
import schedule
import time
from datetime import datetime
def sync_data():
"""数据同步函数"""
print(f"[{datetime.now()}] 执行数据同步...")
# 实际数据同步代码
# 从API获取数据写入数据库
pass
def backup_data():
"""数据备份函数"""
print(f"[{datetime.now()}] 执行数据备份...")
# 数据备份代码
pass
# 设置定时任务
schedule.every(30).minutes.do(sync_data) # 每30分钟同步
schedule.every().hour.do(sync_data) # 每小时同步
schedule.every().day.at("02:00").do(backup_data) # 每天凌晨2点备份
schedule.every().monday.do(sync_data) # 每周一同步
# 启动定时任务
while True:
schedule.run_pending()
time.sleep(1)
使用 APScheduler - 功能最强大的定时任务
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.triggers.cron import CronTrigger
from datetime import datetime
def sync_database_data():
"""同步数据库数据"""
print(f"[{datetime.now()}] 开始同步数据库数据...")
# 数据库同步代码
print(f"[{datetime.now()}] 数据库同步完成")
def sync_api_data():
"""同步API数据"""
print(f"[{datetime.now()}] 开始同步API数据...")
# API数据同步代码
print(f"[{datetime.now()}] API数据同步完成")
def cleanup_old_data():
"""清理旧数据"""
print(f"[{datetime.now()}] 开始清理旧数据...")
# 数据清理代码
print(f"[{datetime.now()}] 旧数据清理完成")
# 创建调度器
scheduler = BlockingScheduler()
# 添加定时任务
scheduler.add_job(
sync_database_data,
'interval',
minutes=30, # 每30分钟执行
id='db_sync'
)
scheduler.add_job(
sync_api_data,
'cron',
hour='*/2', # 每2小时执行
minute=0,
id='api_sync'
)
scheduler.add_job(
cleanup_old_data,
'cron',
day_of_week='mon-fri',
hour='3',
minute='0', # 工作日凌晨3点执行
id='cleanup'
)
# 启动调度器
if __name__ == '__main__':
print("定时任务启动...")
scheduler.start()
完整的实际案例 - 数据同步服务
import time
import logging
from datetime import datetime
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.executors.pool import ThreadPoolExecutor
import requests
import mysql.connector
import json
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('sync_data.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class DataSyncService:
def __init__(self):
self.scheduler = BackgroundScheduler(
executors={
'default': ThreadPoolExecutor(10)
}
)
self.db_config = {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'my_database'
}
def sync_external_api(self):
"""同步外部API数据"""
try:
logger.info("开始同步外部API数据...")
# 模拟API请求
response = requests.get(
'https://api.example.com/data',
timeout=30
)
if response.status_code == 200:
data = response.json()
self.save_to_database(data)
logger.info(f"外部API数据同步完成,同步了 {len(data)} 条记录")
else:
logger.error(f"API请求失败,状态码: {response.status_code}")
except Exception as e:
logger.error(f"外部API同步失败: {str(e)}")
def sync_database_to_backup(self):
"""数据库备份"""
try:
logger.info("开始数据库备份...")
# 连接数据库
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
# 执行备份
cursor.execute("SELECT * FROM my_table")
data = cursor.fetchall()
# 保存备份文件
backup_file = f"backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(backup_file, 'w', encoding='utf-8') as f:
json.dump(data, f, ensure_ascii=False, default=str)
cursor.close()
conn.close()
logger.info(f"数据库备份完成,保存到 {backup_file}")
except Exception as e:
logger.error(f"数据库备份失败: {str(e)}")
def sync_file_system(self):
"""同步文件系统"""
try:
logger.info("开始文件系统同步...")
# 这里写文件同步逻辑
# 将本地文件同步到云存储
logger.info("文件系统同步完成")
except Exception as e:
logger.error(f"文件系统同步失败: {str(e)}")
def save_to_database(self, data):
"""保存数据到数据库"""
try:
conn = mysql.connector.connect(**self.db_config)
cursor = conn.cursor()
for item in data:
cursor.execute("""
INSERT INTO target_table (field1, field2, updated_at)
VALUES (%s, %s, NOW())
ON DUPLICATE KEY UPDATE field1=VALUES(field1)
""", (item['field1'], item['field2']))
conn.commit()
cursor.close()
conn.close()
except Exception as e:
logger.error(f"保存数据到数据库失败: {str(e)}")
def start(self):
"""启动所有定时任务"""
# 不同任务设置不同频率
self.scheduler.add_job(
self.sync_external_api,
'interval',
minutes=30,
id='api_sync',
name='同步API数据'
)
self.scheduler.add_job(
self.sync_database_to_backup,
'cron',
hour='3,15',
minute='0',
id='db_backup',
name='数据库备份'
)
self.scheduler.add_job(
self.sync_file_system,
'interval',
hours=1,
id='file_sync',
name='文件同步'
)
# 启动前立即执行一次
self.sync_external_api()
self.scheduler.start()
logger.info("数据同步服务已启动")
try:
# 保持主线程运行
while True:
time.sleep(1)
except KeyboardInterrupt:
self.scheduler.shutdown()
logger.info("数据同步服务已停止")
# 使用示例
if __name__ == '__main__':
sync_service = DataSyncService()
sync_service.start()
注意事项
错误处理
def safe_sync(func):
"""装饰器:安全执行同步任务"""
def wrapper(*args, **kwargs):
try:
return func(*args, **kwargs)
except Exception as e:
logger.error(f"同步任务 {func.__name__} 失败: {str(e)}")
# 可以发送告警通知
send_alert(f"同步失败: {str(e)}")
return wrapper
@safe_sync
def critical_sync_task():
# 关键同步任务
pass
配置管理
# config.yaml
sync_config:
database:
host: localhost
port: 3306
user: root
password: ${DB_PASSWORD}
tasks:
- name: api_sync
type: interval
interval: 30
unit: minutes
function: sync_external_api
- name: db_backup
type: cron
expression: "0 3 * * *"
function: sync_database_to_backup
建议:
- 对于简单场景使用
schedule库 - 对于复杂业务使用
APScheduler - 生产环境建议使用专门的分布式任务调度系统
- 添加重试机制和告警通知
- 做好日志记录和监控
选择哪种方式主要取决于你的具体需求:简单定时任务用 time.sleep(),中等复杂度用 schedule,企业级应用用 APScheduler。