本文目录导读:

我来详细介绍Python中实现数据回归分析的几种常见方法。
线性回归分析
1 简单线性回归
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.linear_model import LinearRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error, r2_score
import seaborn as sns
# 创建示例数据
np.random.seed(42)
X = np.random.randn(100, 1) * 10 # 特征
y = 3 * X + 5 + np.random.randn(100, 1) * 2 # 目标变量,添加噪声
# 分割数据集
X_train, X_test, y_train, y_test = train_test_split(
X, y, test_size=0.2, random_state=42
)
# 创建并训练模型
model = LinearRegression()
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
# 输出结果
print(f"斜率 (coef): {model.coef_[0][0]:.4f}")
print(f"截距 (intercept): {model.intercept_[0]:.4f}")
print(f"R² 分数: {r2_score(y_test, y_pred):.4f}")
print(f"均方误差: {mean_squared_error(y_test, y_pred):.4f}")
# 可视化
plt.figure(figsize=(10, 6))
plt.scatter(X_test, y_test, color='blue', label='实际值')
plt.plot(X_test, y_pred, color='red', linewidth=2, label='预测值')
plt.xlabel('特征 X')
plt.ylabel('目标 y')'线性回归结果')
plt.legend()
plt.grid(True, alpha=0.3)
plt.show()
2 多元线性回归
# 生成多元数据
np.random.seed(42)
n_samples = 200
# 创建多个特征
X_multi = np.random.randn(n_samples, 3) * 10
true_coef = [2, -1.5, 3]
y_multi = np.dot(X_multi, true_coef) + 5 + np.random.randn(n_samples) * 3
# 转换为DataFrame便于查看
df = pd.DataFrame(X_multi, columns=['feature1', 'feature2', 'feature3'])
df['target'] = y_multi
# 查看数据相关性
correlation_matrix = df.corr()
plt.figure(figsize=(8, 6))
sns.heatmap(correlation_matrix, annot=True, cmap='coolwarm', center=0)'特征相关性热力图')
plt.show()
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(
X_multi, y_multi, test_size=0.2, random_state=42
)
# 训练多元线性回归模型
multi_model = LinearRegression()
multi_model.fit(X_train, y_train)
# 预测和评估
y_pred_multi = multi_model.predict(X_test)
print("多元线性回归结果:")
print(f"系数: {multi_model.coef_}")
print(f"截距: {multi_model.intercept_:.4f}")
print(f"R² 分数: {r2_score(y_test, y_pred_multi):.4f}")
print(f"均方误差: {mean_squared_error(y_test, y_pred_multi):.4f}")
# 残差分析
residuals = y_test - y_pred_multi
plt.figure(figsize=(12, 5))
plt.subplot(1, 2, 1)
plt.scatter(y_pred_multi, residuals, alpha=0.6)
plt.xlabel('预测值')
plt.ylabel('残差')'残差图')
plt.axhline(y=0, color='r', linestyle='--')
plt.grid(True, alpha=0.3)
plt.subplot(1, 2, 2)
plt.hist(residuals, bins=20, edgecolor='black', alpha=0.7)
plt.xlabel('残差')
plt.ylabel('频数')'残差分布')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
从CSV文件导入数据
# 假设有一个CSV文件包含数据
def perform_regression_analysis(csv_file, target_column):
"""
从CSV文件加载数据并执行回归分析
参数:
csv_file: CSV文件路径
target_column: 目标变量列名
"""
# 加载数据
try:
df = pd.read_csv(csv_file)
print("数据预览:")
print(df.head())
print(f"\n数据形状: {df.shape}")
print(f"列名: {list(df.columns)}")
except:
print("使用示例数据代替...")
# 生成示例数据
df = pd.DataFrame({
'feature1': np.random.randn(100),
'feature2': np.random.randn(100),
'feature3': np.random.randn(100),
'target': 2 + 3 * np.random.randn(100) + np.random.randn(100) * 0.5
})
target_column = 'target'
# 分离特征和目标变量
X = df.drop(target_column, axis=1)
y = df[target_column]
# 检查缺失值
if df.isnull().sum().sum() > 0:
print("数据中存在缺失值,进行填充...")
df = df.fillna(df.mean())
X = df.drop(target_column, axis=1)
y = df[target_column]
# 特征标准化
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(
X_scaled, y, test_size=0.2, random_state=42
)
# 训练模型
model = LinearRegression()
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
# 评估
print(f"\n回归分析结果:")
print(f"特征数量: {X.shape[1]}")
print(f"训练集大小: {len(X_train)}")
print(f"测试集大小: {len(X_test)}")
print(f"R² 分数: {r2_score(y_test, y_pred):.4f}")
print(f"均方误差: {mean_squared_error(y_test, y_pred):.4f}")
# 特征重要性
feature_importance = pd.DataFrame({
'feature': X.columns,
'coefficient': model.coef_
})
print(f"\n特征系数:")
print(feature_importance.sort_values('coefficient', key=abs, ascending=False))
return model, scaler
# 执行分析(使用示例数据)
model, scaler = perform_regression_analysis('data.csv', 'target')
多项式回归
from sklearn.preprocessing import PolynomialFeatures
from sklearn.pipeline import make_pipeline
# 生成非线性数据
np.random.seed(42)
X_poly = np.sort(np.random.rand(100, 1) * 10, axis=0)
y_poly = np.sin(X_poly).ravel() + np.random.randn(100) * 0.1
# 创建不同阶数的多项式模型
degrees = [1, 3, 5, 10]
plt.figure(figsize=(15, 10))
for i, degree in enumerate(degrees, 1):
plt.subplot(2, 2, i)
# 创建多项式回归模型
model_poly = make_pipeline(
PolynomialFeatures(degree),
LinearRegression()
)
# 训练模型
model_poly.fit(X_poly, y_poly)
# 预测
X_test_poly = np.linspace(0, 10, 100).reshape(-1, 1)
y_pred_poly = model_poly.predict(X_test_poly)
# 绘制结果
plt.scatter(X_poly, y_poly, color='blue', alpha=0.6, label='实际数据')
plt.plot(X_test_poly, y_pred_poly, color='red', label=f'多项式阶数={degree}')
plt.xlabel('X')
plt.ylabel('y')
plt.title(f'多项式回归 (degree={degree})')
plt.legend()
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
正则化回归(防止过拟合)
from sklearn.linear_model import Ridge, Lasso, ElasticNet
# 生成高维数据
np.random.seed(42)
n_features = 20
X_high = np.random.randn(200, n_features)
true_coef_high = np.zeros(n_features)
true_coef_high[:5] = [2, 1.5, -1, 0.5, 0.3] # 只有前5个特征有效
y_high = np.dot(X_high, true_coef_high) + np.random.randn(200) * 0.5
# 分割数据
X_train, X_test, y_train, y_test = train_test_split(
X_high, y_high, test_size=0.2, random_state=42
)
# 不同的正则化模型
models = {
'线性回归': LinearRegression(),
'Ridge (L2)': Ridge(alpha=1.0),
'Lasso (L1)': Lasso(alpha=0.1),
'ElasticNet': ElasticNet(alpha=0.1, l1_ratio=0.5)
}
results = []
plt.figure(figsize=(15, 10))
for i, (name, model) in enumerate(models.items(), 1):
plt.subplot(2, 2, i)
# 训练模型
model.fit(X_train, y_train)
# 预测
y_pred = model.predict(X_test)
# 计算评估指标
r2 = r2_score(y_test, y_pred)
mse = mean_squared_error(y_test, y_pred)
results.append({
'Model': name,
'R²': round(r2, 4),
'MSE': round(mse, 4),
'非零系数': sum(abs(model.coef_) > 1e-3) if hasattr(model, 'coef_') else len(model.coef_)
})
# 绘制系数
plt.bar(range(n_features), model.coef_)
plt.xlabel('特征索引')
plt.ylabel('系数值')
plt.title(f'{name}\nR²={r2:.4f}, MSE={mse:.4f}')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# 显示结果对比
results_df = pd.DataFrame(results)
print("模型对比:")
print(results_df)
完整的数据分析流程
class RegressionAnalysis:
"""回归分析完整流程类"""
def __init__(self, data_path=None):
self.data = None
self.model = None
self.scaler = None
self.features = None
self.target = None
if data_path:
self.load_data(data_path)
def load_data(self, data_path):
"""加载数据"""
try:
self.data = pd.read_csv(data_path)
print(f"成功加载数据,形状: {self.data.shape}")
except:
print("加载失败,生成示例数据...")
self.generate_sample_data()
def generate_sample_data(self, n_samples=200):
"""生成示例数据"""
np.random.seed(42)
self.data = pd.DataFrame({
'feature1': np.random.randn(n_samples) * 10,
'feature2': np.random.randn(n_samples) * 5,
'feature3': np.random.randn(n_samples) * 8,
'target': 3 + 2 * np.random.randn(n_samples) +
1.5 * np.random.randn(n_samples) +
np.random.randn(n_samples) * 0.5
})
print(f"生成示例数据,形状: {self.data.shape}")
def preprocess_data(self, target_column, test_size=0.2):
"""数据预处理"""
if target_column not in self.data.columns:
raise ValueError(f"列 '{target_column}' 不存在")
# 处理缺失值
if self.data.isnull().sum().sum() > 0:
print("检测到缺失值,进行填充...")
self.data = self.data.fillna(self.data.mean())
# 分离特征和目标
self.target = self.data[target_column]
self.features = self.data.drop(target_column, axis=1)
# 标准化
self.scaler = StandardScaler()
features_scaled = self.scaler.fit_transform(self.features)
# 分割数据
self.X_train, self.X_test, self.y_train, self.y_test = train_test_split(
features_scaled, self.target, test_size=test_size, random_state=42
)
print(f"数据预处理完成")
print(f"训练集大小: {len(self.X_train)}")
print(f"测试集大小: {len(self.X_test)}")
def train_model(self, model_type='linear'):
"""训练模型"""
if model_type == 'linear':
self.model = LinearRegression()
elif model_type == 'ridge':
self.model = Ridge(alpha=1.0)
elif model_type == 'lasso':
self.model = Lasso(alpha=0.1)
else:
raise ValueError(f"不支持的模型类型: {model_type}")
self.model.fit(self.X_train, self.y_train)
# 预测和评估
y_pred = self.model.predict(self.X_test)
r2 = r2_score(self.y_test, y_pred)
mse = mean_squared_error(self.y_test, y_pred)
print(f"\n模型训练完成:")
print(f"模型类型: {model_type}")
print(f"R² 分数: {r2:.4f}")
print(f"MSE: {mse:.4f}")
return r2, mse
def plot_results(self):
"""可视化结果"""
y_pred = self.model.predict(self.X_test)
plt.figure(figsize=(15, 5))
# 预测vs实际
plt.subplot(1, 3, 1)
plt.scatter(self.y_test, y_pred, alpha=0.6)
plt.plot([self.y_test.min(), self.y_test.max()],
[self.y_test.min(), self.y_test.max()], 'r--', lw=2)
plt.xlabel('实际值')
plt.ylabel('预测值')
plt.title('预测值 vs 实际值')
plt.grid(True, alpha=0.3)
# 残差图
plt.subplot(1, 3, 2)
residuals = self.y_test - y_pred
plt.scatter(y_pred, residuals, alpha=0.6)
plt.axhline(y=0, color='r', linestyle='--')
plt.xlabel('预测值')
plt.ylabel('残差')
plt.title('残差图')
plt.grid(True, alpha=0.3)
# 特征重要性
plt.subplot(1, 3, 3)
if hasattr(self.model, 'coef_'):
feature_importance = pd.DataFrame({
'feature': self.features.columns,
'importance': abs(self.model.coef_)
}).sort_values('importance', ascending=True)
plt.barh(feature_importance['feature'], feature_importance['importance'])
plt.xlabel('特征重要性 (|系数|)')
plt.title('特征重要性分析')
plt.tight_layout()
plt.show()
def predict_new_data(self, new_data):
"""预测新数据"""
if self.model is None or self.scaler is None:
raise ValueError("请先训练模型")
# 确保新数据格式正确
if isinstance(new_data, dict):
new_data = pd.DataFrame([new_data])
elif isinstance(new_data, list):
new_data = pd.DataFrame(new_data)
# 标准化并预测
new_data_scaled = self.scaler.transform(new_data)
predictions = self.model.predict(new_data_scaled)
return predictions
# 使用示例
if __name__ == "__main__":
# 创建分析实例
analyzer = RegressionAnalysis()
analyzer.generate_sample_data()
# 数据预处理
analyzer.preprocess_data('target')
# 训练模型
analyzer.train_model('linear')
# 可视化结果
analyzer.plot_results()
# 预测新数据
new_data = {'feature1': 1.5, 'feature2': -0.5, 'feature3': 2.0}
prediction = analyzer.predict_new_data(new_data)
print(f"\n新数据预测结果: {prediction[0]:.4f}")
实际案例分析:房价预测
# 模拟房价数据
np.random.seed(42)
n_houses = 1000
# 生成特征
house_data = pd.DataFrame({
'area': np.random.normal(100, 30, n_houses), # 面积(平方米)
'bedrooms': np.random.randint(1, 5, n_houses), # 卧室数量
'age': np.random.randint(0, 50, n_houses), # 房龄
'distance_center': np.random.exponential(5, n_houses), # 距市中心距离
'floor': np.random.randint(1, 20, n_houses) # 楼层
})
# 生成房价(带噪声)
house_data['price'] = (
50 +
0.5 * house_data['area'] +
10 * house_data['bedrooms'] -
0.3 * house_data['age'] -
2 * house_data['distance_center'] +
0.5 * house_data['floor'] +
np.random.randn(n_houses) * 10
)
# 查看数据统计信息
print("房价数据统计:")
print(house_data.describe())
# 执行回归分析
analyzer_house = RegressionAnalysis()
analyzer_house.data = house_data
analyzer_house.preprocess_data('price')
analyzer_house.train_model('linear')
analyzer_house.plot_results()
# 特征重要性分析
print("\n特征重要性(按系数绝对值排序):")
feature_importance = pd.DataFrame({
'特征': house_data.drop('price', axis=1).columns,
'系数': np.abs(analyzer_house.model.coef_)
}).sort_values('系数', ascending=False)
print(feature_importance)
- 数据预处理:处理缺失值、特征标准化、数据分割
- 模型选择:根据数据特性选择线性回归、多项式回归或正则化回归
- 模型评估:使用R²、MSE、残差分析等指标
- 可视化:预测vs实际图、残差图、特征重要性图
- 特征工程:相关性分析、特征选择、多项式特征
这些案例涵盖了从简单到复杂的回归分析场景,可以根据实际需求进行调整和扩展。