我来为你介绍几种Python定时备份数据的方法和案例。

使用 schedule 库定时备份
基础备份脚本
import schedule
import time
import shutil
import os
from datetime import datetime
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s'
)
def backup_data(source_path, backup_dir):
"""
备份数据函数
:param source_path: 源文件/目录路径
:param backup_dir: 备份目录
"""
try:
# 生成备份文件名(带时间戳)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = os.path.basename(source_path)
backup_file = f"{timestamp}_{filename}"
backup_path = os.path.join(backup_dir, backup_file)
# 创建备份目录(如果不存在)
os.makedirs(backup_dir, exist_ok=True)
# 执行备份
if os.path.isdir(source_path):
# 备份整个目录
shutil.copytree(source_path, backup_path)
else:
# 备份单个文件
shutil.copy2(source_path, backup_path)
logging.info(f"备份成功: {backup_path}")
# 清理旧备份(保留最近N个备份)
cleanup_old_backups(backup_dir, keep_count=7)
except Exception as e:
logging.error(f"备份失败: {e}")
def cleanup_old_backups(backup_dir, keep_count=7):
"""
清理旧备份文件
"""
files = [f for f in os.listdir(backup_dir)
if os.path.isfile(os.path.join(backup_dir, f))]
# 按修改时间排序
files.sort(key=lambda x: os.path.getmtime(os.path.join(backup_dir, x)))
# 删除多余文件
while len(files) > keep_count:
old_file = os.path.join(backup_dir, files.pop(0))
os.remove(old_file)
logging.info(f"清理旧备份: {old_file}")
# 配置备份任务
source = "/path/to/your/data" # 要备份的数据路径
backup_dir = "/path/to/backup" # 备份存储路径
# 每天凌晨2点备份
schedule.every().day.at("02:00").do(backup_data, source, backup_dir)
# 或者每小时备份一次
# schedule.every(1).hours.do(backup_data, source, backup_dir)
# 或者每小时的第30分钟备份
# schedule.every().hour.at(":30").do(backup_data, source, backup_dir)
# 运行任务
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
备份MySQL数据库
import schedule
import time
import subprocess
import os
from datetime import datetime
import logging
logging.basicConfig(level=logging.INFO)
def backup_mysql_database(db_config, backup_dir):
"""
备份MySQL数据库
"""
try:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_file = f"database_backup_{timestamp}.sql"
backup_path = os.path.join(backup_dir, backup_file)
# 创建备份目录
os.makedirs(backup_dir, exist_ok=True)
# MySQL命令
command = f"mysqldump -h {db_config['host']} -u {db_config['user']} -p{db_config['password']} {db_config['database']} > {backup_path}"
# 执行备份
result = subprocess.run(command, shell=True, capture_output=True, text=True)
if result.returncode == 0:
logging.info(f"数据库备份成功: {backup_file}")
# 压缩备份文件
import gzip
with open(backup_path, 'rb') as f_in:
with gzip.open(f"{backup_path}.gz", 'wb') as f_out:
f_out.writelines(f_in)
# 删除未压缩文件
os.remove(backup_path)
logging.info(f"备份文件已压缩: {backup_file}.gz")
# 清理旧备份
cleanup_old_backups(backup_dir, keep_count=7)
else:
logging.error(f"数据库备份失败: {result.stderr}")
except Exception as e:
logging.error(f"备份过程出错: {e}")
# 数据库配置
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'your_password',
'database': 'your_database'
}
backup_dir = "/path/to/backup/mysql"
# 每天凌晨3点备份数据库
schedule.every().day.at("03:00").do(backup_mysql_database, db_config, backup_dir)
while True:
schedule.run_pending()
time.sleep(60)
备份到远程服务器(使用rsync)
import schedule
import time
import subprocess
import logging
from datetime import datetime
logging.basicConfig(level=logging.INFO)
def backup_to_remote(source_path, remote_config):
"""
使用rsync备份到远程服务器
"""
try:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# rsync命令
command = [
"rsync", "-avz", "--delete",
source_path,
f"{remote_config['user']}@{remote_config['host']}:{remote_config['path']}"
]
# 执行rsync
result = subprocess.run(command, capture_output=True, text=True)
if result.returncode == 0:
logging.info(f"[{timestamp}] 远程备份成功")
logging.info(f"同步统计: {result.stdout.splitlines()[-1]}")
else:
logging.error(f"远程备份失败: {result.stderr}")
except Exception as e:
logging.error(f"备份过程出错: {e}")
# 配置
source = "/path/to/your/data"
remote_config = {
'host': '192.168.1.100',
'user': 'backup_user',
'path': '/backup/location'
}
# 每天凌晨4点备份到远程服务器
schedule.every().day.at("04:00").do(backup_to_remote, source, remote_config)
while True:
schedule.run_pending()
time.sleep(60)
使用配置文件管理多个备份任务
# config.yaml 配置文件
"""
tasks:
- name: "备份项目文件"
type: "local"
source: "/path/to/project"
destination: "/backup/project"
schedule: "02:00"
keep_count: 7
- name: "备份数据库"
type: "database"
db_type: "mysql"
host: "localhost"
database: "myapp"
schedule: "03:00"
keep_count: 14
- name: "备份到远程"
type: "remote"
source: "/path/to/data"
remote_host: "192.168.1.100"
remote_user: "backup"
remote_path: "/backup/data"
schedule: "04:00"
"""
# backup_manager.py
import yaml
import schedule
import time
import logging
logging.basicConfig(level=logging.INFO)
def load_config(config_file="config.yaml"):
"""加载配置文件"""
with open(config_file, 'r') as f:
return yaml.safe_load(f)
def setup_tasks(config):
"""根据配置设置定时任务"""
for task in config['tasks']:
if task['type'] == 'local':
from functools import partial
func = partial(backup_data, task['source'], task['destination'])
schedule.every().day.at(task['schedule']).do(func)
elif task['type'] == 'database':
db_config = {
'host': task['host'],
'database': task['database']
}
schedule.every().day.at(task['schedule']).do(
backup_mysql_database, db_config, task.get('destination', '/backup/db')
)
elif task['type'] == 'remote':
remote_config = {
'host': task['remote_host'],
'user': task['remote_user'],
'path': task['remote_path']
}
schedule.every().day.at(task['schedule']).do(
backup_to_remote, task['source'], remote_config
)
logging.info(f"已设置任务: {task['name']} @ {task['schedule']}")
# 主程序
if __name__ == "__main__":
config = load_config()
setup_tasks(config)
while True:
schedule.run_pending()
time.sleep(60)
使用Windows任务计划程序(Windows系统)
# windows_scheduler_backup.py
import os
import sys
import shutil
from datetime import datetime
import logging
logging.basicConfig(
level=logging.INFO,
filename='backup.log',
format='%(asctime)s - %(levelname)s - %(message)s'
)
def scheduled_backup():
"""Windows任务计划程序调用的备份函数"""
try:
# 备份配置
source = r"C:\MyData"
backup_dir = r"D:\Backups"
# 创建备份
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"backup_{timestamp}"
backup_path = os.path.join(backup_dir, backup_name)
shutil.copytree(source, backup_path)
logging.info(f"备份成功: {backup_path}")
# 清理超过30天的备份
cleanup_days = 30
for folder in os.listdir(backup_dir):
folder_path = os.path.join(backup_dir, folder)
if os.path.isdir(folder_path):
folder_time = os.path.getctime(folder_path)
if (time.time() - folder_time) > (cleanup_days * 86400):
shutil.rmtree(folder_path)
logging.info(f"清理过期备份: {folder}")
except Exception as e:
logging.error(f"备份失败: {e}")
if __name__ == "__main__":
scheduled_backup()
安装依赖
# 安装schedule库 pip install schedule # 如果是YAML配置方式 pip install pyyaml # 安装其他可能需要依赖 pip install schedule pyyaml
运行建议
-
在生产环境使用:
- 使用
nohup或systemd服务保持脚本运行 - 添加错误处理和日志记录
- 设置合理的备份保留策略
- 使用
-
作为systemd服务运行:
# /etc/systemd/system/backup.service """ [Unit] Description=Data Backup Service After=network.target
[Service] Type=simple ExecStart=/usr/bin/python3 /path/to/backup_script.py Restart=always User=backup_user
[Install] WantedBy=multi-user.target """
启用服务
sudo systemctl enable backup.service sudo systemctl start backup.service
这些方案覆盖了不同的备份场景,你可以根据实际需求选择合适的方案或组合使用,记得在实际部署前进行充分测试。