Python案例如何清理本地缓存?

wen python案例 57

本文目录导读:

Python案例如何清理本地缓存?

  1. 清理Python自身的缓存(pycache
  2. 清理requests库的DNS缓存
  3. 清理lru_cache装饰器缓存
  4. 清理自定义缓存文件
  5. 清理内存缓存(简单字典缓存)
  6. 清理数据库查询缓存
  7. 综合缓存清理工具
  8. 注意事项

在Python中清理本地缓存通常涉及多种场景,以下是常见的Python缓存类型及其清理方法:

清理Python自身的缓存(pycache

单目录清理

import os
import shutil
def clean_pycache(path="."):
    """删除指定目录下的所有__pycache__文件夹"""
    for root, dirs, files in os.walk(path):
        if "__pycache__" in dirs:
            cache_path = os.path.join(root, "__pycache__")
            print(f"删除: {cache_path}")
            shutil.rmtree(cache_path)
            dirs.remove("__pycache__")  # 避免重复遍历
clean_pycache()

使用命令行

# Linux/Mac
find . -type d -name "__pycache__" -exec rm -rf {} +
# Windows PowerShell
Get-ChildItem -Path . -Recurse -Directory -Filter "__pycache__" | Remove-Item -Recurse -Force

清理requests库的DNS缓存

import requests
from requests_cache import clear
def clean_requests_cache():
    """清理requests的DNS缓存和会话缓存"""
    # 清理DNS缓存
    requests.packages.urllib3.disable_warnings()
    # 如果使用了requests-cache
    try:
        clear()
        print("requests缓存已清理")
    except:
        print("未检测到requests-cache")
clean_requests_cache()

清理lru_cache装饰器缓存

from functools import lru_cache
@lru_cache(maxsize=128)
def expensive_function(n):
    return n * 2
def clean_lru_cache():
    """清理函数缓存"""
    expensive_function.cache_clear()
    print(f"lru缓存已清理,当前缓存信息: {expensive_function.cache_info()}")
clean_lru_cache()

清理自定义缓存文件

import os
import json
import pickle
from datetime import datetime, timedelta
class CacheManager:
    def __init__(self, cache_dir="cache"):
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)
    def clean_expired_cache(self, max_age_hours=24):
        """清理过期的缓存文件"""
        now = datetime.now()
        cleaned = 0
        for filename in os.listdir(self.cache_dir):
            filepath = os.path.join(self.cache_dir, filename)
            if os.path.isfile(filepath):
                # 获取文件修改时间
                mtime = datetime.fromtimestamp(os.path.getmtime(filepath))
                age = now - mtime
                if age > timedelta(hours=max_age_hours):
                    os.remove(filepath)
                    cleaned += 1
                    print(f"删除过期缓存: {filename}")
        print(f"共清理了 {cleaned} 个过期缓存文件")
    def clean_all(self):
        """清理所有缓存文件"""
        for filename in os.listdir(self.cache_dir):
            filepath = os.path.join(self.cache_dir, filename)
            if os.path.isfile(filepath):
                os.remove(filepath)
        print("所有缓存文件已清理")
# 使用示例
cache_mgr = CacheManager()
cache_mgr.clean_expired_cache(max_age_hours=48)  # 清理48小时前的缓存
# cache_mgr.clean_all()  # 清理所有缓存

清理内存缓存(简单字典缓存)

import time
import threading
class MemoryCache:
    def __init__(self):
        self.cache = {}
        self.expiry_times = {}
    def set(self, key, value, expiry_seconds=300):
        """设置缓存,带过期时间"""
        self.cache[key] = value
        self.expiry_times[key] = time.time() + expiry_seconds
    def get(self, key):
        """获取缓存,如果过期则返回None"""
        if key in self.cache:
            if time.time() < self.expiry_times[key]:
                return self.cache[key]
            else:
                self.delete(key)
        return None
    def delete(self, key):
        """删除单个缓存"""
        if key in self.cache:
            del self.cache[key]
            del self.expiry_times[key]
    def clean_expired(self):
        """清理所有过期缓存"""
        now = time.time()
        expired_keys = [k for k, v in self.expiry_times.items() if now >= v]
        for key in expired_keys:
            self.delete(key)
        if expired_keys:
            print(f"清理了 {len(expired_keys)} 个过期缓存")
    def clean_all(self):
        """清理所有缓存"""
        self.cache.clear()
        self.expiry_times.clear()
        print("所有内存缓存已清理")
# 使用示例
cache = MemoryCache()
cache.set("user_1", {"name": "张三"}, expiry_seconds=60)

清理数据库查询缓存

# SQLite 缓存清理
import sqlite3
def clean_sqlite_cache(db_path="database.db"):
    """清理SQLite的查询缓存"""
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    # 清理缓存的查询计划
    cursor.execute("PRAGMA query_cache_size = 0;")
    # 清理wal日志
    cursor.execute("PRAGMA wal_checkpoint(TRUNCATE);")
    # 重新设置缓存大小
    cursor.execute("PRAGMA query_cache_size = -2000;")  # 约2MB
    conn.close()
    print("SQLite查询缓存已清理")
clean_sqlite_cache()

综合缓存清理工具

import os
import platform
import subprocess
def clean_system_cache():
    """根据操作系统清理系统级缓存(需管理员权限)"""
    system = platform.system().lower()
    if system == "windows":
        # Windows:清理DNS缓存
        subprocess.run(["ipconfig", "/flushdns"], capture_output=True)
    elif system == "linux":
        # Linux:清理内存缓存
        os.system("echo 3 > /proc/sys/vm/drop_caches")
    elif system == "darwin":
        # macOS:清理DNS缓存
        subprocess.run(["sudo", "killall", "-HUP", "mDNSResponder"], capture_output=True)
    print(f"{system} 系统缓存清理完成(可能需要管理员权限)")
# 安全清理(需要用户确认)
def safe_clean_cache(cache_path="cache"):
    """安全清理缓存,带备份功能"""
    import shutil
    from datetime import datetime
    if os.path.exists(cache_path):
        # 创建备份
        backup_path = f"{cache_path}_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
        shutil.copytree(cache_path, backup_path)
        print(f"缓存已备份到: {backup_path}")
        # 清理
        shutil.rmtree(cache_path)
        os.makedirs(cache_path)
        print("缓存清理完成")
    else:
        print("缓存目录不存在")
# 使用示例
if __name__ == "__main__":
    # 1. 清理Python缓存文件
    clean_pycache()
    # 2. 清理内存缓存
    cache = MemoryCache()
    cache.clean_all()
    # 3. 清理自定义缓存文件
    cache_mgr = CacheManager()
    cache_mgr.clean_all()

注意事项

  1. 权限问题:系统级缓存清理可能需要管理员/root权限
  2. 数据安全:清理前确保重要数据已持久化
  3. 性能影响:大规模清理时可能短暂影响应用性能
  4. 缓存策略:考虑使用LRU、TTL等策略自动清理

根据具体使用场景选择合适的缓存清理方法,建议将清理操作用作维护工具而非应用核心逻辑的一部分。

抱歉,评论功能暂时关闭!