Python案例怎么清理过期数据?

wen python案例 24

本文目录导读:

Python案例怎么清理过期数据?

  1. 清理文件中的过期数据
  2. 清理数据库中的过期记录
  3. 清理Redis中的过期数据(过期键自动驱逐)
  4. 清理内存中的缓存数据(带过期时间)
  5. 综合案例:定时清理任务
  6. 核心要点总结

在Python中清理过期数据有多种方法,具体取决于你的数据存储方式(文件、数据库、内存等),以下是几种常见场景的代码案例:

清理文件中的过期数据

案例:清理日志文件(基于时间戳)

import os
import time
from datetime import datetime, timedelta
def clean_expired_logs(log_dir, retention_days=30):
    """
    清理N天前的日志文件
    """
    now = time.time()
    retention_seconds = retention_days * 86400  # 1天=86400秒
    for filename in os.listdir(log_dir):
        filepath = os.path.join(log_dir, filename)
        if os.path.isfile(filepath):
            # 获取文件最后修改时间
            file_time = os.path.getmtime(filepath)
            if now - file_time > retention_seconds:
                os.remove(filepath)
                print(f"已删除过期日志: {filename}")
# 使用示例
clean_expired_logs("/var/log/myapp", retention_days=7)

清理数据库中的过期记录

案例1:SQLite数据库清理

import sqlite3
from datetime import datetime, timedelta
def clean_expired_orders(db_path):
    """
    清理超过30天的订单记录
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    # 计算30天前的日期
    cutoff_date = datetime.now() - timedelta(days=30)
    # 删除过期数据
    cursor.execute("""
        DELETE FROM orders 
        WHERE created_at < ?
    """, (cutoff_date.strftime('%Y-%m-%d %H:%M:%S'),))
    deleted_count = cursor.rowcount
    conn.commit()
    conn.close()
    print(f"已清理 {deleted_count} 条过期订单")
# 使用示例
clean_expired_orders("shop.db")

案例2:MySQL数据库清理(批量处理防阻塞)

import pymysql
from datetime import datetime, timedelta
def clean_expired_data_batch(host, user, password, db, table, date_column, retention_days=90, batch_size=1000):
    """
    批量清理过期数据,避免大事务阻塞
    """
    cutoff_date = datetime.now() - timedelta(days=retention_days)
    conn = pymysql.connect(host=host, user=user, password=password, db=db)
    cursor = conn.cursor()
    total_deleted = 0
    while True:
        # 每次删除一小批
        cursor.execute(f"""
            DELETE FROM {table} 
            WHERE {date_column} < %s 
            LIMIT %s
        """, (cutoff_date, batch_size))
        deleted = cursor.rowcount
        total_deleted += deleted
        conn.commit()
        if deleted < batch_size:  # 没有更多数据了
            break
        print(f"已删除 {total_deleted} 条记录...")
    print(f"清理完成,共删除 {total_deleted} 条过期数据")
    cursor.close()
    conn.close()
# 使用示例
clean_expired_data_batch(
    host='localhost',
    user='root',
    password='password',
    db='myapp',
    table='sessions',
    date_column='last_active',
    retention_days=7
)

清理Redis中的过期数据(过期键自动驱逐)

Redis本身有自动过期机制,但可以手动清理过期的键:

import redis
from datetime import datetime, timedelta
def clean_expired_redis_keys(redis_client, pattern="*", max_scan=1000):
    """
    主动清理Redis中的过期键
    注意:Redis 6.0+ 有主动过期机制,通常不需要手动清理
    """
    cursor = '0'
    total_deleted = 0
    while cursor != 0:
        # SCAN扫描键
        cursor, keys = redis_client.scan(cursor=cursor, match=pattern, count=max_scan)
        # 检查每个键的TTL
        for key in keys:
            ttl = redis_client.ttl(key)
            if ttl == -1:  # 没有过期时间
                print(f"键 {key.decode()} 没有过期时间,是否需要设置?")
            elif ttl == -2:  # 键已不存在
                total_deleted += 1
    print(f"清理完成,共发现 {total_deleted} 个已过期键")

清理内存中的缓存数据(带过期时间)

import time
from collections import OrderedDict
class ExpiringCache:
    """
    带过期时间的内存缓存
    """
    def __init__(self, default_ttl=300):  # 默认5分钟过期
        self.cache = OrderedDict()
        self.expiry = {}  # key -> 过期时间戳
        self.default_ttl = default_ttl
    def set(self, key, value, ttl=None):
        """设置缓存并记录过期时间"""
        if ttl is None:
            ttl = self.default_ttl
        self.cache[key] = value
        self.expiry[key] = time.time() + ttl
        # 清理过期数据(懒清理)
        self._lazy_cleanup()
    def get(self, key):
        """获取缓存,自动检查过期"""
        if key in self.cache:
            if time.time() > self.expiry.get(key, 0):
                # 已过期
                self._remove(key)
                return None
            return self.cache[key]
        return None
    def _lazy_cleanup(self):
        """懒清理:删除所有过期数据"""
        now = time.time()
        expired_keys = [
            k for k, exp in self.expiry.items() 
            if now > exp
        ]
        for key in expired_keys:
            self._remove(key)
    def _remove(self, key):
        """删除指定键"""
        self.cache.pop(key, None)
        self.expiry.pop(key, None)
# 使用示例
cache = ExpiringCache(default_ttl=60)
cache.set("user_123", {"name": "Alice"})
# 60秒后自动过期

综合案例:定时清理任务

import schedule
import time
from datetime import datetime
def cleanup_job():
    """
    定时清理任务:每天凌晨2点执行
    """
    print(f"[{datetime.now()}] 开始执行清理任务...")
    # 1. 清理日志文件
    clean_expired_logs("/var/log/myapp", retention_days=30)
    # 2. 清理数据库
    clean_expired_data_batch(
        host='localhost', user='root', password='password',
        db='myapp', table='sessions', retention_days=7
    )
    # 3. 清理其他地方...
    print(f"[{datetime.now()}] 清理任务完成")
# 每天凌晨2:00执行
schedule.every().day.at("02:00").do(cleanup_job)
# 也可以每1小时执行一次
# schedule.every(1).hour.do(cleanup_job)
# 启动定时任务
while True:
    schedule.run_pending()
    time.sleep(60)  # 每分钟检查一次

核心要点总结

  1. 文件清理:使用 os.path.getmtime() 获取文件修改时间
  2. 数据库清理:用 DELETE WHERE date < cutoff 语句,注意分批处理
  3. Redis清理:Redis自动过期通常足够,可以用 SCAN + TTL 检查
  4. 内存缓存:实现懒清理机制,在访问时检查过期
  5. 定时任务:推荐使用 scheduleAPScheduler

重要提醒

  • 清理前建议先做数据备份
  • WHERE + LIMIT分批删除,避免大事务
  • 生产环境先在测试环境验证清理逻辑
  • 添加日志记录清理详情,便于排查问题

抱歉,评论功能暂时关闭!