Python案例如何实现数据采样?

wen python案例 3

本文目录导读:

Python案例如何实现数据采样?

  1. 基本随机采样
  2. 数据框(DataFrame)采样
  3. 实际应用案例
  4. 采样方法对比
  5. 实用技巧总结

我来为你介绍Python中数据采样的几种常见实现方法。

基本随机采样

使用random模块

import random
# 创建一个样本数据
data = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
# 简单随机采样(不放回)
sample_size = 3
random_sample = random.sample(data, sample_size)
print(f"随机采样(不放回): {random_sample}")
# 简单随机采样(放回)
random_choices = random.choices(data, k=3)
print(f"随机采样(放回): {random_choices}")

使用numpy库

import numpy as np
# 创建数据
data = np.array([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
# 随机采样
sample_indices = np.random.choice(len(data), size=3, replace=False)
numpy_sample = data[sample_indices]
print(f"Numpy随机采样: {numpy_sample}")
# 带权重的采样
weights = [0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1, 0.1]
weighted_sample = np.random.choice(data, size=3, p=weights)
print(f"带权重的采样: {weighted_sample}")

数据框(DataFrame)采样

pandas库实现

import pandas as pd
import numpy as np
# 创建示例数据框
df = pd.DataFrame({
    'id': range(1, 101),
    'name': [f'User_{i}' for i in range(1, 101)],
    'age': np.random.randint(18, 65, 100),
    'salary': np.random.randint(3000, 20000, 100)
})
# 简单的随机采样
sample_df = df.sample(n=10)  # 采样10条数据
print("随机采样10条数据:")
print(sample_df)
# 按比例采样
sample_frac = df.sample(frac=0.2)  # 采样20%的数据
print(f"\n按比例采样(20%): 共{len(sample_frac)}条")
# 分层采样
# 按年龄分组,每组采样2条
stratified_sample = df.groupby(pd.cut(df['age'], bins=[18, 30, 40, 50, 65])).apply(
    lambda x: x.sample(n=2, random_state=42)
).reset_index(drop=True)
print(f"\n分层采样结果(按年龄分层):")
print(stratified_sample)

实际应用案例

案例1:用户调查数据采样

import pandas as pd
import numpy as np
# 模拟用户数据
np.random.seed(42)
n_users = 10000
user_data = pd.DataFrame({
    'user_id': range(1, n_users + 1),
    'age_group': np.random.choice(['18-25', '26-35', '36-45', '46-55', '55+'], n_users),
    'city': np.random.choice(['北京', '上海', '广州', '深圳', '杭州'], n_users),
    'is_vip': np.random.choice([True, False], n_users, p=[0.3, 0.7]),
    'spending': np.random.exponential(scale=500, size=n_users)
})
print("用户数据概览:")
print(user_data.head())
print(f"用户总数: {len(user_data)}")
# 分层采样确保各年龄段和城市都有代表
def stratified_sample_by_groups(df, n_per_group=50):
    """按多个特征进行分层采样"""
    sample = df.groupby(['age_group', 'city']).apply(
        lambda x: x.sample(n=min(n_per_group, len(x)), random_state=42)
    ).reset_index(drop=True)
    return sample
survey_sample = stratified_sample_by_groups(user_data, n_per_group=10)
print(f"\n分层采样后的用户数量: {len(survey_sample)}")
print("\n各年龄段采样数量:")
print(survey_sample['age_group'].value_counts())

案例2:大数据集随机采样

import pandas as pd
import numpy as np
# 模拟大型数据集
def create_large_dataset():
    """创建大型数据集的函数"""
    n_rows = 1000000
    data = {
        'feature1': np.random.randn(n_rows),
        'feature2': np.random.randn(n_rows),
        'feature3': np.random.randint(1, 10, n_rows),
        'target': np.random.choice([0, 1], n_rows)
    }
    return pd.DataFrame(data)
# 创建大数据集
large_df = create_large_dataset()
print(f"数据集大小: {large_df.shape}")
# 高效采样方法
# 方法1:使用sample方法
sampled_df = large_df.sample(n=1000, random_state=42)
print(f"\n随机采样1000条数据: {sampled_df.shape}")
# 方法2:使用行号采样(适合非常大的数据集)
row_indices = np.random.choice(len(large_df), size=5000, replace=False)
sampled_by_index = large_df.iloc[row_indices]
print(f"行索引采样5000条: {sampled_by_index.shape}")
# 方法3:系统采样(每隔K条取一条)
k = len(large_df) // 1000  # 采样1000条
systematic_sample = large_df.iloc[::k].head(1000)
print(f"系统采样1000条: {systematic_sample.shape}")

案例3:时间序列数据采样

import pandas as pd
import numpy as np
# 模拟时间序列数据
date_rng = pd.date_range(start='2024-01-01', end='2024-12-31', freq='1H')
time_series_data = pd.DataFrame(date_rng, columns=['timestamp'])
time_series_data['value'] = np.random.randn(len(date_rng))
print(f"原始数据长度: {len(time_series_data)}")
# 时间序列采样
# 1. 按固定时间间隔采样
hourly_sample = time_series_data.iloc[::6]  # 每6小时采样一次
print(f"每6小时采样: {len(hourly_sample)}条")
# 2. 使用resample进行时间重采样
time_series_data.set_index('timestamp', inplace=True)
daily_sample = time_series_data.resample('1D').mean()  # 每日平均值
print(f"每日平均值采样: {len(daily_sample)}条")
# 3. 随机时间点采样
random_times = pd.date_range(start='2024-06-01', end='2024-06-30', periods=10)
random_time_sample = time_series_data.loc[random_times]
print(f"随机时间点采样: {len(random_time_sample)}条")

采样方法对比

import time
import pandas as pd
import numpy as np
# 构建测试数据
test_data = pd.DataFrame({
    'id': range(1, 100001),
    'value': np.random.randn(100000),
    'group': np.random.choice(['A', 'B', 'C', 'D'], 100000)
})
def compare_sampling_methods(data, sample_size=1000):
    """比较不同采样方法的性能"""
    methods = {}
    # 方法1:sample方法
    start = time.time()
    sample1 = data.sample(n=sample_size, random_state=42)
    methods['sample方法'] = time.time() - start
    # 方法2:iloc + random choice
    start = time.time()
    indices = np.random.choice(len(data), sample_size, replace=False)
    sample2 = data.iloc[indices]
    methods['iloc+随机索引'] = time.time() - start
    # 方法3:布尔索引
    start = time.time()
    mask = np.zeros(len(data), dtype=bool)
    mask[:sample_size] = True
    np.random.shuffle(mask)
    sample3 = data[mask]
    methods['布尔索引'] = time.time() - start
    return methods
# 执行性能对比
results = compare_sampling_methods(test_data)
print("采样方法性能对比(秒):")
for method, time_taken in results.items():
    print(f"{method}: {time_taken:.4f}s")

实用技巧总结

# 采样时常用技巧
# 1. 设置随机种子确保可重复性
import random
random.seed(42)
np.random.seed(42)
# 2. 分层采样确保代表性
df_grouped = df.groupby('category').apply(
    lambda x: x.sample(min(10, len(x)), random_state=42)
)
# 3. 加权采样
weights = [0.8 if x == 'important' else 0.2 for x in df['type']]
weighted_sample = df.sample(n=100, weights=weights)
# 4. 条件采样
condition = df['age'] > 30
filtered_sample = df[condition].sample(n=50)
# 5. 批量处理大型数据集
def batch_sample(dataframe, batch_size=10000, sample_size=1000):
    """批处理采样,适合超大文件"""
    total_rows = len(dataframe)
    indices = np.random.choice(total_rows, sample_size, replace=False)
    return dataframe.iloc[indices]
# 6. 处理缺失值采样
def safe_sample_without_nans(dataframe, sample_size):
    """确保采样结果没有缺失值"""
    clean_data = dataframe.dropna()
    return clean_data.sample(n=min(sample_size, len(clean_data)))
  1. 选择合适的方法:小数据集用random.sample(),大数据集用np.random.choice()df.sample()
  2. 考虑采样偏差:对于分类数据,使用分层采样保持原分布
  3. 性能优化:处理大数据时,考虑系统采样或批处理
  4. 可重复性:始终设置随机种子(random_stateseed
  5. 验证采样结果:检查采样后的数据分布是否与原始数据一致

这些方法涵盖了Python数据采样的主要场景,你可以根据具体需求选择最合适的实现方式。

抱歉,评论功能暂时关闭!