Python案例怎么修复异常数据?

wen python案例 28

本文目录导读:

Python案例怎么修复异常数据?

  1. 案例:修复销售数据中的异常值
  2. 常用的异常修复方法总结
  3. 最佳实践建议

在Python中修复异常数据通常包含以下几个核心步骤:检测异常 → 分析原因 → 选择修复策略 → 实施修复

下面我将通过一个具体的案例,详细演示如何修复异常数据。

案例:修复销售数据中的异常值

假设我们有一个销售数据集,包含以下常见异常:

  • 负数的销售额
  • 超出正常范围的数值(如单笔销售100万)
  • 缺失值
  • 重复数据

基础环境与数据准备

import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
# 创建包含异常数据的示例
data = {
    '交易ID': ['T001', 'T002', 'T003', 'T004', 'T005', 'T006', 'T007'],
    '商品名': ['手机', '电脑', '手机', '平板', '手机', '电脑', '手机'],
    '单价': [2000, 5000, -500, 3000, 8000, 5000, 2000],
    '数量': [1, 2, -3, 4, 100, 2, 1],
    '销售额': [2000, 10000, None, 12000, 800000, 10000, 2000],
    '日期': ['2024-01-01', '2024-01-02', '2024-01-03', '2024-01-04', 
             '2024-01-05', '2024-01-05', '2024-01-05']
}
df = pd.DataFrame(data)
print("原始数据:")
print(df)

异常数据检测

# 2.1 检测缺失值
print("\n缺失值检查:")
print(df.isnull().sum())
# 2.2 检测重复数据
print(f"\n重复行数:{df.duplicated().sum()}")
# 2.3 检测负值(不合理的数据)
print("\n负值检查(单价和数量不应为负):")
print(f"单价为负的行数:{(df['单价'] < 0).sum()}")
print(f"数量为负的行数:{(df['数量'] < 0).sum()}")
# 2.4 使用统计方法检测异常值(Z-Score法)
from scipy import stats
z_scores = np.abs(stats.zscore(df['销售额'].dropna()))
outliers = np.where(z_scores > 3)  # Z-Score > 3 视为异常
print(f"\n销售额异常值索引(Z-Score法):{outliers}")
# 2.5 使用IQR方法检测异常值
Q1 = df['销售额'].quantile(0.25)
Q3 = df['销售额'].quantile(0.75)
IQR = Q3 - Q1
lower_bound = Q1 - 1.5 * IQR
upper_bound = Q3 + 1.5 * IQR
print(f"IQR范围:{lower_bound:.0f} 到 {upper_bound:.0f}")

异常数据修复策略(核心部分)

def fix_sales_data(df):
    """综合修复销售数据的异常"""
    df_fixed = df.copy()
    # 3.1 处理缺失值
    # 用同类商品的均值填充
    df_fixed['销售额'] = df_fixed.groupby('商品名')['销售额'].transform(
        lambda x: x.fillna(x.mean())
    )
    # 3.2 处理负值(改为正数或0)
    df_fixed['单价'] = df_fixed['单价'].abs()  # 改为绝对值
    df_fixed['数量'] = df_fixed['数量'].apply(lambda x: max(x, 1))  # 至少为1
    # 3.3 处理数值异常(超出合理范围)
    # 设定合理范围:单价不超过100万,数量不超过100
    price_median = df_fixed['单价'].median()
    qty_median = df_fixed['数量'].median()
    df_fixed['单价'] = df_fixed['单价'].apply(
        lambda x: price_median if x > 1000000 or x < 10 else x
    )
    df_fixed['数量'] = df_fixed['数量'].apply(
        lambda x: qty_median if x > 100 else x
    )
    # 3.4 重新计算销售额(一致性修复)
    df_fixed['销售额'] = df_fixed['单价'] * df_fixed['数量']
    # 3.5 处理重复数据
    df_fixed = df_fixed.drop_duplicates()
    return df_fixed
# 执行修复
df_fixed = fix_sales_data(df)

更精细的异常检测与修复

class DataCleaner:
    """数据清洗器 - 更完善的方法"""
    def __init__(self, df):
        self.df = df.copy()
        self.error_log = []
    def check_and_fix_negatives(self, columns):
        """修复负值"""
        for col in columns:
            negative_mask = self.df[col] < 0
            if negative_mask.any():
                self.error_log.append(f"{col} 有 {negative_mask.sum()} 个负值")
                self.df[col] = self.df[col].abs()
        return self
    def check_and_fix_outliers(self, column, method='iqr', factor=1.5):
        """修复异常值"""
        if method == 'iqr':
            Q1 = self.df[column].quantile(0.25)
            Q3 = self.df[column].quantile(0.75)
            IQR = Q3 - Q1
            lower = Q1 - factor * IQR
            upper = Q3 + factor * IQR
            # 用中位数替换异常值
            median_val = self.df[column].median()
            self.df[column] = self.df[column].clip(lower, upper)
            outliers_count = ((self.df[column] < lower) | (self.df[column] > upper)).sum()
            if outliers_count > 0:
                self.error_log.append(f"{column} 修复了 {outliers_count} 个异常值")
        return self
    def fix_missing_values(self, strategy='mean'):
        """修复缺失值"""
        for col in self.df.columns:
            if self.df[col].isnull().any():
                self.error_log.append(f"{col} 有 {self.df[col].isnull().sum()} 个缺失值")
                if strategy == 'mean':
                    self.df[col].fillna(self.df[col].mean(), inplace=True)
                elif strategy == 'median':
                    self.df[col].fillna(self.df[col].median(), inplace=True)
                elif strategy == 'ffill':
                    self.df[col].fillna(method='ffill', inplace=True)
        return self
    def get_report(self):
        """获取清洗报告"""
        return {
            'errors': self.error_log,
            'original_shape': self.df.shape,
            'cleaned_shape': self.df.drop_duplicates().shape
        }
# 使用清洗器
cleaner = DataCleaner(df)
cleaner.check_and_fix_negatives(['单价', '数量'])
cleaner.check_and_fix_outliers('销售额')
cleaner.fix_missing_values('mean')
report = cleaner.get_report()
print("清洗报告:")
for error in report['errors']:
    print(f"  - {error}")

结果验证

# 修复前后对比
print("="*50)
print("修复前数据:")
print(df.describe())
print("\n修复后数据:")
print(df_fixed.describe())
print("\n修复详情:")
print(f"修复前行数:{len(df)},修复后行数:{len(df_fixed)}")
print(f"修复前缺失值:{df.isnull().sum().sum()},修复后缺失值:{df_fixed.isnull().sum().sum()}")
# 可视化对比
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
df['销售额'].hist(ax=axes[0], bins=10)
axes[0].set_title('修复前销售额分布')
axes[0].set_xlabel('销售额')
df_fixed['销售额'].hist(ax=axes[1], bins=10)
axes[1].set_title('修复后销售额分布')
axes[1].set_xlabel('销售额')
plt.tight_layout()
plt.show()

常用的异常修复方法总结

异常类型 检测方法 推荐修复策略
缺失值 isnull() 均值填充、中位数填充、插值法、模型预测
负值 < 0 取绝对值、设为0或其他默认值
数值异常 Z-Score, IQR 截断、替换为分位数、删除
重复数据 duplicated() 删除重复、保留最后一条
逻辑异常 业务规则校验 根据业务逻辑修正或删除

最佳实践建议

  1. 先备份原数据:修复前保留原始数据副本
  2. 记录日志:记录每次修改的原因和内容
  3. 业务验证:仅靠统计方法不够,需要结合业务知识
  4. 渐进式修复:先修复明显错误,再处理边缘情况
  5. 测试验证:修复后验证数据完整性和一致性

这个案例覆盖了最常见的异常数据修复场景,实际应用中,你需要根据具体的数据特征和业务需求选择合适的修复策略。

抱歉,评论功能暂时关闭!