本文目录导读:

我将为您提供一个Python基线检查的完整案例,基线检查主要用于验证系统配置是否符合安全标准或合规要求。
完整的基线检查系统案例
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
系统安全基线检查工具
检查Linux系统配置是否符合安全标准
"""
import os
import sys
import json
import pwd
import grp
import datetime
import subprocess
import hashlib
from typing import Dict, List, Tuple, Any
class BaselineChecker:
"""系统基线检查器"""
def __init__(self, report_file: str = "baseline_report.json"):
self.report_file = report_file
self.results = {
"timestamp": datetime.datetime.now().isoformat(),
"hostname": self._get_hostname(),
"checks": []
}
def _get_hostname(self) -> str:
"""获取主机名"""
try:
return os.uname()[1]
except:
return subprocess.getoutput("hostname")
def _run_command(self, command: str) -> Tuple[int, str, str]:
"""执行系统命令"""
try:
process = subprocess.Popen(
command,
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True
)
stdout, stderr = process.communicate()
return process.returncode, stdout.strip(), stderr.strip()
except Exception as e:
return -1, "", str(e)
def _add_result(self, check_name: str, passed: bool,
description: str, detail: Any = None) -> None:
"""添加检查结果"""
result = {
"check_name": check_name,
"passed": passed,
"description": description,
"detail": detail,
"timestamp": datetime.datetime.now().isoformat()
}
self.results["checks"].append(result)
# 输出结果
status = "✓" if passed else "✗"
print(f"[{status}] {check_name}: {description}")
# ============== 账户和密码策略检查 ==============
def check_password_policy(self) -> None:
"""检查密码策略"""
issues = []
# 检查密码策略配置文件
try:
with open('/etc/login.defs', 'r') as f:
content = f.read()
# 检查密码最小长度
if 'PASS_MIN_LEN' in content:
for line in content.split('\n'):
if 'PASS_MIN_LEN' in line and not line.startswith('#'):
min_len = int(line.split()[-1])
if min_len < 8:
issues.append(f"密码最小长度 {min_len} < 8")
# 检查密码最大年龄
if 'PASS_MAX_DAYS' in content:
for line in content.split('\n'):
if 'PASS_MAX_DAYS' in line and not line.startswith('#'):
max_days = int(line.split()[-1])
if max_days > 90:
issues.append(f"密码最大使用天数 {max_days} > 90")
except FileNotFoundError:
issues.append("未找到 /etc/login.defs 文件")
# 检查shadow文件权限
try:
stat = os.stat('/etc/shadow')
if stat.st_mode & 0o077: # 检查其他用户权限
issues.append("shadow文件权限过于宽松")
except:
issues.append("无法访问 /etc/shadow")
self._add_result(
"password_policy",
len(issues) == 0,
"密码策略检查",
issues if issues else "策略符合要求"
)
def check_empty_passwords(self) -> None:
"""检查空密码账户"""
ret_code, output, error = self._run_command(
"awk -F: '($2 == \"\") {print $1}' /etc/shadow"
)
empty_users = output.split('\n') if output else []
self._add_result(
"empty_passwords",
len(empty_users) == 0,
"空密码账户检查",
f"空密码用户: {empty_users}" if empty_users else "未发现空密码账户"
)
# ============== 文件系统权限检查 ==============
def check_suid_sgid_files(self) -> None:
"""检查SUID/SGID文件"""
ret_code, output, error = self._run_command(
"find / -type f \\( -perm -4000 -o -perm -2000 \\) 2>/dev/null"
)
suid_files = output.split('\n') if output else []
# 常见合法的SUID/SGID程序
legitimate = ['/usr/bin/passwd', '/usr/bin/su', '/usr/bin/sudo',
'/usr/bin/newgrp', '/usr/bin/chsh', '/usr/bin/chfn',
'/usr/bin/gpasswd', '/usr/bin/pkexec']
suspicious = [f for f in suid_files if f not in legitimate]
self._add_result(
"suid_sgid_files",
len(suspicious) == 0,
"SUID/SGID文件检查",
f"可疑SUID/SGID文件: {suspicious}" if suspicious else "无异常SUID/SGID文件"
)
def check_world_writable_files(self) -> None:
"""检查全局可写文件"""
ret_code, output, error = self._run_command(
"find / -perm -2 -type f ! -path '/proc/*' ! -path '/sys/*' 2>/dev/null"
)
writable_files = output.split('\n')[:10] if output else [] # 只显示前10个
self._add_result(
"world_writable_files",
len(writable_files) == 0,
"全局可写文件检查",
f"全局可写文件 (前10个): {writable_files}" if writable_files else "无异常全局可写文件"
)
def check_home_directory_permissions(self) -> None:
"""检查用户主目录权限"""
issues = []
users = pwd.getpwall()
for user in users:
if user.pw_dir != '/nonexistent' and os.path.exists(user.pw_dir):
try:
stat_info = os.stat(user.pw_dir)
# 检查主目录权限是否过于宽松(其他用户有写权限)
if stat_info.st_mode & 0o022:
issues.append(f"用户 {user.pw_name} 的主目录权限问题: {oct(stat_info.st_mode)}")
except:
continue
self._add_result(
"home_directory_permissions",
len(issues) == 0,
"用户主目录权限检查",
issues if issues else "所有用户主目录权限正常"
)
# ============== 服务检查 ==============
def check_unnecessary_services(self) -> None:
"""检查不必要的服务"""
unnecessary_services = ['telnet', 'rsh', 'rlogin', 'vsftpd', 'tftp']
running_services = []
for service in unnecessary_services:
ret_code, output, error = self._run_command(f"systemctl is-active {service} 2>/dev/null")
if output == 'active':
running_services.append(service)
self._add_result(
"unnecessary_services",
len(running_services) == 0,
"不必要服务检查",
f"正在运行的不必要服务: {running_services}" if running_services else "无不必要服务运行"
)
def check_ssh_configuration(self) -> None:
"""检查SSH配置"""
issues = []
sshd_config = '/etc/ssh/sshd_config'
if not os.path.exists(sshd_config):
self._add_result("ssh_configuration", False, "SSH配置检查", "未找到sshd_config文件")
return
with open(sshd_config, 'r') as f:
content = f.read()
# 检查关键配置
checks = {
'PermitRootLogin': (r'PermitRootLogin\s+yes', "允许root登录"),
'Protocol': (r'Protocol\s+1', "使用不安全的SSH协议版本1"),
'PermitEmptyPasswords': (r'PermitEmptyPasswords\s+yes', "允许空密码"),
}
for key, (pattern, desc) in checks.items():
import re
if re.search(pattern, content, re.IGNORECASE):
issues.append(desc)
self._add_result(
"ssh_configuration",
len(issues) == 0,
"SSH配置检查",
issues if issues else "SSH配置安全"
)
# ============== 网络和安全检查 ==============
def check_open_ports(self) -> None:
"""检查开放端口"""
ret_code, output, error = self._run_command("ss -tlnp 2>/dev/null")
open_ports = []
for line in output.split('\n')[1:]: # 跳过标题行
if line.strip():
parts = line.split()
if len(parts) >= 4:
port_info = parts[3]
open_ports.append(port_info)
# 检查是否有高危端口
high_risk_ports = ['23', '21', '135', '137', '138', '139', '445']
found_risk = [port for port in open_ports
if any(hp in port for hp in high_risk_ports)]
self._add_result(
"open_ports",
len(found_risk) == 0,
"开放端口检查",
f"高危端口: {found_risk}" if found_risk else f"开放端口数: {len(open_ports)}"
)
def check_firewall_status(self) -> None:
"""检查防火墙状态"""
ret_code, output, error = self._run_command("systemctl is-active firewalld")
firewall_active = output == 'active'
# 如果firewalld未运行,检查iptables
if not firewall_active:
ret_code, output, error = self._run_command("iptables -L -n 2>/dev/null")
firewall_active = bool(output and 'Chain' in output)
self._add_result(
"firewall_status",
firewall_active,
"防火墙状态检查",
f"防火墙状态: {'运行中' if firewall_active else '未运行'}"
)
# ============== 日志和审计检查 ==============
def check_audit_logging(self) -> None:
"""检查审计日志配置"""
issues = []
# 检查auditd服务
ret_code, output, error = self._run_command("systemctl is-active auditd")
if output != 'active':
issues.append("auditd服务未运行")
# 检查rsyslog服务
ret_code, output, error = self._run_command("systemctl is-active rsyslog")
if output != 'active':
issues.append("rsyslog服务未运行")
# 检查日志轮转配置
if not os.path.exists('/etc/logrotate.conf'):
issues.append("未配置日志轮转")
self._add_result(
"audit_logging",
len(issues) == 0,
"审计日志检查",
issues if issues else "审计日志配置正常"
)
# ============== 核心转储检查 ==============
def check_core_dump(self) -> None:
"""检查核心转储配置"""
issues = []
try:
with open('/etc/security/limits.conf', 'r') as f:
content = f.read()
if '* soft core 0' not in content and '* hard core 0' not in content:
issues.append("未禁用核心转储")
except:
issues.append("无法读取limits.conf")
# 检查内核参数
ret_code, output, error = self._run_command(
"sysctl fs.suid_dumpable 2>/dev/null | awk '{print $3}'"
)
if output == '1' or output == '2':
issues.append(f"核心转储内核参数不安全: fs.suid_dumpable={output}")
self._add_result(
"core_dump",
len(issues) == 0,
"核心转储检查",
issues if issues else "核心转储配置安全"
)
# ============== 生成报告 ==============
def generate_report(self) -> None:
"""生成检查报告"""
# 计算总体通过率
total_checks = len(self.results["checks"])
passed_checks = sum(1 for check in self.results["checks"] if check["passed"])
self.results["summary"] = {
"total_checks": total_checks,
"passed_checks": passed_checks,
"failed_checks": total_checks - passed_checks,
"pass_rate": f"{(passed_checks/total_checks*100):.1f}%" if total_checks > 0 else "0%"
}
# 写入文件
with open(self.report_file, 'w', encoding='utf-8') as f:
json.dump(self.results, f, ensure_ascii=False, indent=2)
print(f"\n{'='*60}")
print(f"检查报告已保存到: {self.report_file}")
print(f"总检查项: {total_checks}")
print(f"通过: {passed_checks}")
print(f"失败: {total_checks - passed_checks}")
print(f"通过率: {self.results['summary']['pass_rate']}")
print('='*60)
def run_all_checks(self) -> None:
"""运行所有基线检查"""
print(f"\n开始进行系统基线检查 - {datetime.datetime.now()}")
print('='*60)
print("\n1. 账户和密码策略检查")
print("-" * 30)
self.check_password_policy()
self.check_empty_passwords()
print("\n2. 文件系统权限检查")
print("-" * 30)
self.check_suid_sgid_files()
self.check_world_writable_files()
self.check_home_directory_permissions()
print("\n3. 服务检查")
print("-" * 30)
self.check_unnecessary_services()
self.check_ssh_configuration()
print("\n4. 网络和安全检查")
print("-" * 30)
self.check_open_ports()
self.check_firewall_status()
print("\n5. 日志和审计检查")
print("-" * 30)
self.check_audit_logging()
print("\n6. 系统配置检查")
print("-" * 30)
self.check_core_dump()
self.generate_report()
# ============== 使用示例 ==============
def main():
"""主函数"""
checker = BaselineChecker()
print("="*60)
print("系统安全基线检查工具 v1.0")
print("="*60)
# 运行所有检查
checker.run_all_checks()
# 或者单独运行特定的检查
# checker.check_password_policy()
# checker.check_ssh_configuration()
# 生成自定义格式的报告
print("\n生成SVG格式报告...")
generate_svg_report(checker.results)
def generate_svg_report(results: Dict) -> None:
"""生成简单的SVG格式报告(示例)"""
summary = results["summary"]
svg_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<svg xmlns="http://www.w3.org/2000/svg" width="800" height="600">
<rect width="800" height="600" fill="#f0f0f0"/>
<text x="400" y="50" text-anchor="middle" font-size="24" font-weight="bold">
系统基线检查报告
</text>
<text x="400" y="80" text-anchor="middle" font-size="14">
主机: {results['hostname']}
</text>
<text x="400" y="100" text-anchor="middle" font-size="14">
时间: {results['timestamp']}
</text>
<!-- 统计信息 -->
<rect x="50" y="130" width="700" height="150" rx="10" fill="white" stroke="#ccc"/>
<text x="400" y="160" text-anchor="middle" font-size="16" font-weight="bold">
检查结果统计
</text>
<text x="200" y="190" font-size="14">总检查项: {summary['total_checks']}</text>
<text x="200" y="215" font-size="14" fill="green">通过: {summary['passed_checks']}</text>
<text x="200" y="240" font-size="14" fill="red">失败: {summary['failed_checks']}</text>
<text x="200" y="265" font-size="14">通过率: {summary['pass_rate']}</text>
<!-- 通过率进度条 -->
<rect x="400" y="190" width="300" height="20" rx="10" fill="#e0e0e0"/>
<rect x="400" y="190" width="{300 * summary['passed_checks'] // max(summary['total_checks'], 1)}"
height="20" rx="10" fill="#4CAF50"/>
<!-- 检查列表 -->
<text x="50" y="320" font-size="16" font-weight="bold">检查项详情:</text>
"""
y_pos = 345
for check in results["checks"]:
color = "green" if check["passed"] else "red"
symbol = "✓" if check["passed"] else "✗"
svg_content += f"""
<text x="70" y="{y_pos}" font-size="12" fill="{color}">{symbol}</text>
<text x="90" y="{y_pos}" font-size="12">{check['check_name']}: {check['description']}</text>
"""
y_pos += 25
# 如果超过SVG高度,停止
if y_pos > 580:
break
svg_content += "\n</svg>"
# 保存SVG文件
svg_file = "baseline_report.svg"
with open(svg_file, 'w', encoding='utf-8') as f:
f.write(svg_content)
print(f"SVG报告已保存到: {svg_file}")
if __name__ == "__main__":
# 检查是否以root权限运行
if os.geteuid() != 0:
print("警告: 建议以root权限运行以获得完整的检查结果")
response = input("是否继续? (y/n): ")
if response.lower() != 'y':
sys.exit(1)
main()
使用说明
基本使用
# 运行所有基线检查 python3 baseline_checker.py # 以root权限运行(推荐) sudo python3 baseline_checker.py
自定义检查项
# 创建自定义检查器 checker = BaselineChecker() # 只运行特定检查 checker.check_password_policy() checker.check_ssh_configuration() # 生成报告 checker.generate_report()
扩展检查项
class CustomChecker(BaselineChecker):
"""自定义检查器,添加新的检查项"""
def check_custom_security(self) -> None:
"""自定义安全检查"""
# 实现自定义检查逻辑
pass
检查项说明
账户和密码策略
- 密码最小长度
- 密码最大使用期限
- 空密码账户检测
- shadow文件权限
文件系统权限
- SUID/SGID文件
- 全局可写文件
- 用户主目录权限
服务配置
- 不必要的服务
- SSH安全配置
- Root登录限制
网络安全
- 端口开放情况
- 防火墙状态
- 高危端口检测
审计日志
- 审计服务状态
- 日志配置
- 日志轮转
系统配置
- 核心转储配置
- 内核参数
输出说明
工具会生成两种格式的检查报告:
- JSON格式: 详细的结构化数据,便于程序处理
- SVG格式: 可视化报告,可直接展示
扩展建议
您可以根据需要添加更多检查项:
# 添加补丁管理检查
def check_patch_level(self):
"""检查系统补丁更新"""
pass
# 添加用户权限检查
def check_user_permissions(self):
"""检查用户权限配置"""
pass
# 添加文件完整性检查
def check_file_integrity(self):
"""检查关键文件完整性"""
pass
这个基线检查工具是一个良好的起点,您可以根据组织的要求进行定制和扩展。