本文目录导读:

在Python中清理过期数据有多种方法,具体取决于你的数据存储方式(文件、数据库、内存等),以下是几种常见场景的代码案例:
清理文件中的过期数据
案例:清理日志文件(基于时间戳)
import os
import time
from datetime import datetime, timedelta
def clean_expired_logs(log_dir, retention_days=30):
"""
清理N天前的日志文件
"""
now = time.time()
retention_seconds = retention_days * 86400 # 1天=86400秒
for filename in os.listdir(log_dir):
filepath = os.path.join(log_dir, filename)
if os.path.isfile(filepath):
# 获取文件最后修改时间
file_time = os.path.getmtime(filepath)
if now - file_time > retention_seconds:
os.remove(filepath)
print(f"已删除过期日志: {filename}")
# 使用示例
clean_expired_logs("/var/log/myapp", retention_days=7)
清理数据库中的过期记录
案例1:SQLite数据库清理
import sqlite3
from datetime import datetime, timedelta
def clean_expired_orders(db_path):
"""
清理超过30天的订单记录
"""
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# 计算30天前的日期
cutoff_date = datetime.now() - timedelta(days=30)
# 删除过期数据
cursor.execute("""
DELETE FROM orders
WHERE created_at < ?
""", (cutoff_date.strftime('%Y-%m-%d %H:%M:%S'),))
deleted_count = cursor.rowcount
conn.commit()
conn.close()
print(f"已清理 {deleted_count} 条过期订单")
# 使用示例
clean_expired_orders("shop.db")
案例2:MySQL数据库清理(批量处理防阻塞)
import pymysql
from datetime import datetime, timedelta
def clean_expired_data_batch(host, user, password, db, table, date_column, retention_days=90, batch_size=1000):
"""
批量清理过期数据,避免大事务阻塞
"""
cutoff_date = datetime.now() - timedelta(days=retention_days)
conn = pymysql.connect(host=host, user=user, password=password, db=db)
cursor = conn.cursor()
total_deleted = 0
while True:
# 每次删除一小批
cursor.execute(f"""
DELETE FROM {table}
WHERE {date_column} < %s
LIMIT %s
""", (cutoff_date, batch_size))
deleted = cursor.rowcount
total_deleted += deleted
conn.commit()
if deleted < batch_size: # 没有更多数据了
break
print(f"已删除 {total_deleted} 条记录...")
print(f"清理完成,共删除 {total_deleted} 条过期数据")
cursor.close()
conn.close()
# 使用示例
clean_expired_data_batch(
host='localhost',
user='root',
password='password',
db='myapp',
table='sessions',
date_column='last_active',
retention_days=7
)
清理Redis中的过期数据(过期键自动驱逐)
Redis本身有自动过期机制,但可以手动清理过期的键:
import redis
from datetime import datetime, timedelta
def clean_expired_redis_keys(redis_client, pattern="*", max_scan=1000):
"""
主动清理Redis中的过期键
注意:Redis 6.0+ 有主动过期机制,通常不需要手动清理
"""
cursor = '0'
total_deleted = 0
while cursor != 0:
# SCAN扫描键
cursor, keys = redis_client.scan(cursor=cursor, match=pattern, count=max_scan)
# 检查每个键的TTL
for key in keys:
ttl = redis_client.ttl(key)
if ttl == -1: # 没有过期时间
print(f"键 {key.decode()} 没有过期时间,是否需要设置?")
elif ttl == -2: # 键已不存在
total_deleted += 1
print(f"清理完成,共发现 {total_deleted} 个已过期键")
清理内存中的缓存数据(带过期时间)
import time
from collections import OrderedDict
class ExpiringCache:
"""
带过期时间的内存缓存
"""
def __init__(self, default_ttl=300): # 默认5分钟过期
self.cache = OrderedDict()
self.expiry = {} # key -> 过期时间戳
self.default_ttl = default_ttl
def set(self, key, value, ttl=None):
"""设置缓存并记录过期时间"""
if ttl is None:
ttl = self.default_ttl
self.cache[key] = value
self.expiry[key] = time.time() + ttl
# 清理过期数据(懒清理)
self._lazy_cleanup()
def get(self, key):
"""获取缓存,自动检查过期"""
if key in self.cache:
if time.time() > self.expiry.get(key, 0):
# 已过期
self._remove(key)
return None
return self.cache[key]
return None
def _lazy_cleanup(self):
"""懒清理:删除所有过期数据"""
now = time.time()
expired_keys = [
k for k, exp in self.expiry.items()
if now > exp
]
for key in expired_keys:
self._remove(key)
def _remove(self, key):
"""删除指定键"""
self.cache.pop(key, None)
self.expiry.pop(key, None)
# 使用示例
cache = ExpiringCache(default_ttl=60)
cache.set("user_123", {"name": "Alice"})
# 60秒后自动过期
综合案例:定时清理任务
import schedule
import time
from datetime import datetime
def cleanup_job():
"""
定时清理任务:每天凌晨2点执行
"""
print(f"[{datetime.now()}] 开始执行清理任务...")
# 1. 清理日志文件
clean_expired_logs("/var/log/myapp", retention_days=30)
# 2. 清理数据库
clean_expired_data_batch(
host='localhost', user='root', password='password',
db='myapp', table='sessions', retention_days=7
)
# 3. 清理其他地方...
print(f"[{datetime.now()}] 清理任务完成")
# 每天凌晨2:00执行
schedule.every().day.at("02:00").do(cleanup_job)
# 也可以每1小时执行一次
# schedule.every(1).hour.do(cleanup_job)
# 启动定时任务
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
核心要点总结
- 文件清理:使用
os.path.getmtime()获取文件修改时间 - 数据库清理:用
DELETE WHERE date < cutoff语句,注意分批处理 - Redis清理:Redis自动过期通常足够,可以用
SCAN + TTL检查 - 内存缓存:实现懒清理机制,在访问时检查过期
- 定时任务:推荐使用
schedule或APScheduler库
重要提醒:
- 清理前建议先做数据备份
- 用WHERE + LIMIT分批删除,避免大事务
- 生产环境先在测试环境验证清理逻辑
- 添加日志记录清理详情,便于排查问题