本文目录导读:

我来为你介绍几种Python实现定时清理任务的方法:
使用 schedule 库(适合轻量级定时任务)
import schedule
import time
import os
import shutil
from datetime import datetime, timedelta
def cleanup_temp_files():
"""清理临时文件"""
temp_dir = '/tmp/old_files'
if not os.path.exists(temp_dir):
os.makedirs(temp_dir)
print(f"创建目录: {temp_dir}")
return
# 获取24小时前的文件
cutoff_time = datetime.now() - timedelta(hours=24)
for filename in os.listdir(temp_dir):
filepath = os.path.join(temp_dir, filename)
if os.path.isfile(filepath):
# 获取文件修改时间
mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
if mtime < cutoff_time:
try:
os.remove(filepath)
print(f"删除文件: {filepath}")
except Exception as e:
print(f"删除失败: {filepath}, 错误: {e}")
def cleanup_old_logs():
"""清理旧日志文件"""
log_dir = './logs'
max_days = 7 # 保留7天
if not os.path.exists(log_dir):
return
cutoff_time = datetime.now() - timedelta(days=max_days)
for filename in os.listdir(log_dir):
filepath = os.path.join(log_dir, filename)
if os.path.isfile(filepath) and filename.endswith('.log'):
mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
if mtime < cutoff_time:
try:
os.remove(filepath)
print(f"删除旧日志: {filepath}")
except Exception as e:
print(f"删除失败: {filepath}, 错误: {e}")
def cleanup_cache_directory():
"""清理缓存目录"""
cache_dir = './cache'
max_size_mb = 100 # 缓存最大100MB
if not os.path.exists(cache_dir):
return
total_size = 0
files_info = []
# 收集文件信息
for filename in os.listdir(cache_dir):
filepath = os.path.join(cache_dir, filename)
if os.path.isfile(filepath):
size = os.path.getsize(filepath)
mtime = os.path.getmtime(filepath)
files_info.append((filepath, size, mtime))
total_size += size
# 如果超过最大限制,删除最旧的文件
max_size_bytes = max_size_mb * 1024 * 1024
if total_size > max_size_bytes:
# 按修改时间排序(最旧的在前)
files_info.sort(key=lambda x: x[2])
for filepath, size, _ in files_info:
if total_size <= max_size_bytes:
break
try:
os.remove(filepath)
total_size -= size
print(f"清理缓存: {filepath}")
except Exception as e:
print(f"删除失败: {filepath}, 错误: {e}")
# 设置定时任务
schedule.every().day.at("03:00").do(cleanup_temp_files) # 每天凌晨3点
schedule.every().monday.at("04:00").do(cleanup_old_logs) # 每周一凌晨4点
schedule.every(6).hours.do(cleanup_cache_directory) # 每6小时
# 运行定时任务
print("定时清理任务已启动...")
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
使用 APScheduler(功能更强大的定时任务库)
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.cron import CronTrigger
from apscheduler.triggers.interval import IntervalTrigger
import os
import shutil
from datetime import datetime, timedelta
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class FileCleaner:
"""文件清理器"""
def __init__(self, config=None):
self.config = config or {
'temp_dir': '/tmp/old_files',
'log_dir': './logs',
'cache_dir': './cache',
'temp_max_days': 7,
'log_max_days': 30,
'cache_max_mb': 500,
'downloads_dir': './downloads'
}
def cleanup_temp_files(self):
"""清理临时文件"""
temp_dir = self.config['temp_dir']
max_days = self.config['temp_max_days']
if not os.path.exists(temp_dir):
logger.info(f"临时目录不存在: {temp_dir}")
return
cutoff_time = datetime.now() - timedelta(days=max_days)
cleaned_count = 0
for root, dirs, files in os.walk(temp_dir):
for filename in files:
filepath = os.path.join(root, filename)
try:
mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
if mtime < cutoff_time:
os.remove(filepath)
cleaned_count += 1
logger.debug(f"删除文件: {filepath}")
except Exception as e:
logger.error(f"删除失败: {filepath}, 错误: {e}")
# 清理空目录
for root, dirs, files in os.walk(temp_dir, topdown=False):
for dir_name in dirs:
dir_path = os.path.join(root, dir_name)
try:
if not os.listdir(dir_path):
os.rmdir(dir_path)
logger.info(f"删除空目录: {dir_path}")
except Exception as e:
logger.error(f"删除目录失败: {dir_path}, 错误: {e}")
logger.info(f"临时文件清理完成,共清理 {cleaned_count} 个文件")
def cleanup_old_logs(self):
"""清理旧日志"""
log_dir = self.config['log_dir']
max_days = self.config['log_max_days']
if not os.path.exists(log_dir):
return
cutoff_time = datetime.now() - timedelta(days=max_days)
cleaned_count = 0
for filename in os.listdir(log_dir):
filepath = os.path.join(log_dir, filename)
if os.path.isfile(filepath):
try:
mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
if mtime < cutoff_time:
os.remove(filepath)
cleaned_count += 1
logger.debug(f"删除日志: {filepath}")
except Exception as e:
logger.error(f"删除日志失败: {filepath}, 错误: {e}")
logger.info(f"日志清理完成,共清理 {cleaned_count} 个文件")
def cleanup_cache(self):
"""清理缓存目录"""
cache_dir = self.config['cache_dir']
max_size = self.config['cache_max_mb'] * 1024 * 1024 # 转换为字节
if not os.path.exists(cache_dir):
return
total_size = 0
files_info = []
for root, dirs, files in os.walk(cache_dir):
for filename in files:
filepath = os.path.join(root, filename)
try:
size = os.path.getsize(filepath)
mtime = os.path.getmtime(filepath)
files_info.append((filepath, size, mtime))
total_size += size
except Exception as e:
logger.error(f"读取文件信息失败: {filepath}, 错误: {e}")
# 如果超过限制,删除最旧的文件
if total_size > max_size:
files_info.sort(key=lambda x: x[2])
deleted_size = 0
for filepath, size, _ in files_info:
try:
os.remove(filepath)
deleted_size += size
total_size -= size
logger.debug(f"清理缓存: {filepath}")
if total_size <= max_size:
break
except Exception as e:
logger.error(f"删除缓存文件失败: {filepath}, 错误: {e}")
logger.info(f"缓存清理完成,释放 {deleted_size / 1024 / 1024:.2f}MB")
def cleanup_downloads(self):
"""清理下载目录中未完成的下载"""
downloads_dir = self.config['downloads_dir']
if not os.path.exists(downloads_dir):
return
# 定义需要清理的文件模式
cleanup_patterns = [
'.tmp', '.part', '.crdownload', '.download'
]
cleaned_count = 0
for filename in os.listdir(downloads_dir):
filepath = os.path.join(downloads_dir, filename)
if os.path.isfile(filepath):
# 检查文件扩展名
ext = os.path.splitext(filename)[1].lower()
if ext in cleanup_patterns:
try:
os.remove(filepath)
cleaned_count += 1
logger.info(f"删除未完成下载: {filepath}")
except Exception as e:
logger.error(f"删除失败: {filepath}, 错误: {e}")
logger.info(f"下载目录清理完成,共清理 {cleaned_count} 个文件")
def check_disk_space():
"""检查磁盘空间"""
import psutil
disk = psutil.disk_usage('/')
free_space_gb = disk.free / (1024 ** 3)
total_space_gb = disk.total / (1024 ** 3)
usage_percent = disk.percent
logger.info(f"磁盘空间: 总空间 {total_space_gb:.2f}GB, "
f"已用 {usage_percent}%, "
f"剩余 {free_space_gb:.2f}GB")
# 如果磁盘使用率超过90%,触发清理
if usage_percent > 90:
logger.warning("磁盘使用率超过90%,触发紧急清理")
cleaner.cleanup_temp_files()
cleaner.cleanup_cache()
# 创建清理器实例
cleaner = FileCleaner()
# 创建调度器
scheduler = BackgroundScheduler()
# 添加定时任务
scheduler.add_job(
cleaner.cleanup_temp_files,
CronTrigger(hour=3, minute=0), # 每天凌晨3点
id='temp_cleanup',
name='临时文件清理'
)
scheduler.add_job(
cleaner.cleanup_old_logs,
CronTrigger(day_of_week='mon', hour=4, minute=0), # 每周一凌晨4点
id='log_cleanup',
name='日志清理'
)
scheduler.add_job(
cleaner.cleanup_cache,
IntervalTrigger(hours=6), # 每6小时
id='cache_cleanup',
name='缓存清理'
)
scheduler.add_job(
cleaner.cleanup_downloads,
IntervalTrigger(hours=12), # 每12小时
id='download_cleanup',
name='下载清理'
)
scheduler.add_job(
check_disk_space,
IntervalTrigger(hours=24), # 每天检查
id='disk_check',
name='磁盘检查'
)
# 启动调度器
scheduler.start()
logger.info("定时清理任务已启动")
# 保持程序运行
try:
while True:
time.sleep(1)
except (KeyboardInterrupt, SystemExit):
scheduler.shutdown()
logger.info("定时清理任务已停止")
使用 crontab(Linux系统自带,最稳定)
# crontab_example.py
import os
import sys
import argparse
from datetime import datetime, timedelta
import logging
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/cleanup.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
def cleanup_old_files(directory, days, pattern='*'):
"""清理指定目录中超过指定天数的文件"""
import glob
if not os.path.exists(directory):
logger.warning(f"目录不存在: {directory}")
return
cutoff_time = datetime.now() - timedelta(days=days)
cleaned_count = 0
for filepath in glob.glob(os.path.join(directory, pattern)):
if os.path.isfile(filepath):
try:
mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
if mtime < cutoff_time:
os.remove(filepath)
cleaned_count += 1
logger.debug(f"删除: {filepath}")
except Exception as e:
logger.error(f"删除失败: {filepath}, 错误: {e}")
return cleaned_count
def main():
parser = argparse.ArgumentParser(description='定时清理任务')
parser.add_argument('--action', choices=['temp', 'logs', 'cache'], required=True)
parser.add_argument('--days', type=int, default=7, help='保留天数')
parser.add_argument('--directory', help='目标目录')
args = parser.parse_args()
if args.action == 'temp':
dir_path = args.directory or '/tmp/cleanup'
count = cleanup_old_files(dir_path, args.days)
logger.info(f"清理临时文件: {dir_path}, 共清理 {count} 个文件")
elif args.action == 'logs':
dir_path = args.directory or '/var/log/myapp'
count = cleanup_old_files(dir_path, args.days, '*.log')
logger.info(f"清理日志: {dir_path}, 共清理 {count} 个文件")
elif args.action == 'cache':
dir_path = args.directory or '/var/cache/myapp'
count = cleanup_old_files(dir_path, args.days)
logger.info(f"清理缓存: {dir_path}, 共清理 {count} 个文件")
if __name__ == '__main__':
main()
然后在Linux中配置crontab:
# 编辑crontab crontab -e # 添加以下内容 # 每天凌晨3点清理临时文件 0 3 * * * /usr/bin/python3 /path/to/crontab_example.py --action temp --days 7 # 每周一凌晨4点清理日志 0 4 * * 1 /usr/bin/python3 /path/to/crontab_example.py --action logs --days 30 # 每6小时清理缓存 0 */6 * * * /usr/bin/python3 /path/to/crontab_example.py --action cache --days 1
安装依赖
# 方法一:安装schedule pip install schedule # 方法二:安装APScheduler pip install apscheduler # 磁盘检查需要psutil pip install psutil
使用建议
-
选择合适的方法:
- 简单场景:使用
schedule - 复杂场景:使用
APScheduler - 生产环境:优先使用系统
crontab
- 简单场景:使用
-
注意事项:
- 设置合理的清理周期,避免频繁I/O操作
- 记录清理日志,方便问题排查
- 添加异常处理,避免清理任务中断
- 先测试再部署,避免误删重要文件
-
最佳实践:
- 明确指定要清理的文件类型和目录
- 设置合理的保留时间
- 保留最近几天的文件用于故障排查
- 考虑添加文件大小限制
- 定期检查清理效果