Python案例做基线检查?

wen 网络安全 48

本文目录导读:

Python案例做基线检查?

  1. 完整的基线检查系统案例
  2. 使用说明
  3. 检查项说明
  4. 输出说明
  5. 扩展建议

我将为您提供一个Python基线检查的完整案例,基线检查主要用于验证系统配置是否符合安全标准或合规要求。

完整的基线检查系统案例

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
系统安全基线检查工具
检查Linux系统配置是否符合安全标准
"""
import os
import sys
import json
import pwd
import grp
import datetime
import subprocess
import hashlib
from typing import Dict, List, Tuple, Any
class BaselineChecker:
    """系统基线检查器"""
    def __init__(self, report_file: str = "baseline_report.json"):
        self.report_file = report_file
        self.results = {
            "timestamp": datetime.datetime.now().isoformat(),
            "hostname": self._get_hostname(),
            "checks": []
        }
    def _get_hostname(self) -> str:
        """获取主机名"""
        try:
            return os.uname()[1]
        except:
            return subprocess.getoutput("hostname")
    def _run_command(self, command: str) -> Tuple[int, str, str]:
        """执行系统命令"""
        try:
            process = subprocess.Popen(
                command,
                shell=True,
                stdout=subprocess.PIPE,
                stderr=subprocess.PIPE,
                universal_newlines=True
            )
            stdout, stderr = process.communicate()
            return process.returncode, stdout.strip(), stderr.strip()
        except Exception as e:
            return -1, "", str(e)
    def _add_result(self, check_name: str, passed: bool, 
                   description: str, detail: Any = None) -> None:
        """添加检查结果"""
        result = {
            "check_name": check_name,
            "passed": passed,
            "description": description,
            "detail": detail,
            "timestamp": datetime.datetime.now().isoformat()
        }
        self.results["checks"].append(result)
        # 输出结果
        status = "✓" if passed else "✗"
        print(f"[{status}] {check_name}: {description}")
    # ============== 账户和密码策略检查 ==============
    def check_password_policy(self) -> None:
        """检查密码策略"""
        issues = []
        # 检查密码策略配置文件
        try:
            with open('/etc/login.defs', 'r') as f:
                content = f.read()
            # 检查密码最小长度
            if 'PASS_MIN_LEN' in content:
                for line in content.split('\n'):
                    if 'PASS_MIN_LEN' in line and not line.startswith('#'):
                        min_len = int(line.split()[-1])
                        if min_len < 8:
                            issues.append(f"密码最小长度 {min_len} < 8")
            # 检查密码最大年龄
            if 'PASS_MAX_DAYS' in content:
                for line in content.split('\n'):
                    if 'PASS_MAX_DAYS' in line and not line.startswith('#'):
                        max_days = int(line.split()[-1])
                        if max_days > 90:
                            issues.append(f"密码最大使用天数 {max_days} > 90")
        except FileNotFoundError:
            issues.append("未找到 /etc/login.defs 文件")
        # 检查shadow文件权限
        try:
            stat = os.stat('/etc/shadow')
            if stat.st_mode & 0o077:  # 检查其他用户权限
                issues.append("shadow文件权限过于宽松")
        except:
            issues.append("无法访问 /etc/shadow")
        self._add_result(
            "password_policy",
            len(issues) == 0,
            "密码策略检查",
            issues if issues else "策略符合要求"
        )
    def check_empty_passwords(self) -> None:
        """检查空密码账户"""
        ret_code, output, error = self._run_command(
            "awk -F: '($2 == \"\") {print $1}' /etc/shadow"
        )
        empty_users = output.split('\n') if output else []
        self._add_result(
            "empty_passwords",
            len(empty_users) == 0,
            "空密码账户检查",
            f"空密码用户: {empty_users}" if empty_users else "未发现空密码账户"
        )
    # ============== 文件系统权限检查 ==============
    def check_suid_sgid_files(self) -> None:
        """检查SUID/SGID文件"""
        ret_code, output, error = self._run_command(
            "find / -type f \\( -perm -4000 -o -perm -2000 \\) 2>/dev/null"
        )
        suid_files = output.split('\n') if output else []
        # 常见合法的SUID/SGID程序
        legitimate = ['/usr/bin/passwd', '/usr/bin/su', '/usr/bin/sudo',
                     '/usr/bin/newgrp', '/usr/bin/chsh', '/usr/bin/chfn',
                     '/usr/bin/gpasswd', '/usr/bin/pkexec']
        suspicious = [f for f in suid_files if f not in legitimate]
        self._add_result(
            "suid_sgid_files",
            len(suspicious) == 0,
            "SUID/SGID文件检查",
            f"可疑SUID/SGID文件: {suspicious}" if suspicious else "无异常SUID/SGID文件"
        )
    def check_world_writable_files(self) -> None:
        """检查全局可写文件"""
        ret_code, output, error = self._run_command(
            "find / -perm -2 -type f ! -path '/proc/*' ! -path '/sys/*' 2>/dev/null"
        )
        writable_files = output.split('\n')[:10] if output else []  # 只显示前10个
        self._add_result(
            "world_writable_files",
            len(writable_files) == 0,
            "全局可写文件检查",
            f"全局可写文件 (前10个): {writable_files}" if writable_files else "无异常全局可写文件"
        )
    def check_home_directory_permissions(self) -> None:
        """检查用户主目录权限"""
        issues = []
        users = pwd.getpwall()
        for user in users:
            if user.pw_dir != '/nonexistent' and os.path.exists(user.pw_dir):
                try:
                    stat_info = os.stat(user.pw_dir)
                    # 检查主目录权限是否过于宽松(其他用户有写权限)
                    if stat_info.st_mode & 0o022:
                        issues.append(f"用户 {user.pw_name} 的主目录权限问题: {oct(stat_info.st_mode)}")
                except:
                    continue
        self._add_result(
            "home_directory_permissions",
            len(issues) == 0,
            "用户主目录权限检查",
            issues if issues else "所有用户主目录权限正常"
        )
    # ============== 服务检查 ==============
    def check_unnecessary_services(self) -> None:
        """检查不必要的服务"""
        unnecessary_services = ['telnet', 'rsh', 'rlogin', 'vsftpd', 'tftp']
        running_services = []
        for service in unnecessary_services:
            ret_code, output, error = self._run_command(f"systemctl is-active {service} 2>/dev/null")
            if output == 'active':
                running_services.append(service)
        self._add_result(
            "unnecessary_services",
            len(running_services) == 0,
            "不必要服务检查",
            f"正在运行的不必要服务: {running_services}" if running_services else "无不必要服务运行"
        )
    def check_ssh_configuration(self) -> None:
        """检查SSH配置"""
        issues = []
        sshd_config = '/etc/ssh/sshd_config'
        if not os.path.exists(sshd_config):
            self._add_result("ssh_configuration", False, "SSH配置检查", "未找到sshd_config文件")
            return
        with open(sshd_config, 'r') as f:
            content = f.read()
        # 检查关键配置
        checks = {
            'PermitRootLogin': (r'PermitRootLogin\s+yes', "允许root登录"),
            'Protocol': (r'Protocol\s+1', "使用不安全的SSH协议版本1"),
            'PermitEmptyPasswords': (r'PermitEmptyPasswords\s+yes', "允许空密码"),
        }
        for key, (pattern, desc) in checks.items():
            import re
            if re.search(pattern, content, re.IGNORECASE):
                issues.append(desc)
        self._add_result(
            "ssh_configuration",
            len(issues) == 0,
            "SSH配置检查",
            issues if issues else "SSH配置安全"
        )
    # ============== 网络和安全检查 ==============
    def check_open_ports(self) -> None:
        """检查开放端口"""
        ret_code, output, error = self._run_command("ss -tlnp 2>/dev/null")
        open_ports = []
        for line in output.split('\n')[1:]:  # 跳过标题行
            if line.strip():
                parts = line.split()
                if len(parts) >= 4:
                    port_info = parts[3]
                    open_ports.append(port_info)
        # 检查是否有高危端口
        high_risk_ports = ['23', '21', '135', '137', '138', '139', '445']
        found_risk = [port for port in open_ports 
                     if any(hp in port for hp in high_risk_ports)]
        self._add_result(
            "open_ports",
            len(found_risk) == 0,
            "开放端口检查",
            f"高危端口: {found_risk}" if found_risk else f"开放端口数: {len(open_ports)}"
        )
    def check_firewall_status(self) -> None:
        """检查防火墙状态"""
        ret_code, output, error = self._run_command("systemctl is-active firewalld")
        firewall_active = output == 'active'
        # 如果firewalld未运行,检查iptables
        if not firewall_active:
            ret_code, output, error = self._run_command("iptables -L -n 2>/dev/null")
            firewall_active = bool(output and 'Chain' in output)
        self._add_result(
            "firewall_status",
            firewall_active,
            "防火墙状态检查",
            f"防火墙状态: {'运行中' if firewall_active else '未运行'}"
        )
    # ============== 日志和审计检查 ==============
    def check_audit_logging(self) -> None:
        """检查审计日志配置"""
        issues = []
        # 检查auditd服务
        ret_code, output, error = self._run_command("systemctl is-active auditd")
        if output != 'active':
            issues.append("auditd服务未运行")
        # 检查rsyslog服务
        ret_code, output, error = self._run_command("systemctl is-active rsyslog")
        if output != 'active':
            issues.append("rsyslog服务未运行")
        # 检查日志轮转配置
        if not os.path.exists('/etc/logrotate.conf'):
            issues.append("未配置日志轮转")
        self._add_result(
            "audit_logging",
            len(issues) == 0,
            "审计日志检查",
            issues if issues else "审计日志配置正常"
        )
    # ============== 核心转储检查 ==============
    def check_core_dump(self) -> None:
        """检查核心转储配置"""
        issues = []
        try:
            with open('/etc/security/limits.conf', 'r') as f:
                content = f.read()
            if '* soft core 0' not in content and '* hard core 0' not in content:
                issues.append("未禁用核心转储")
        except:
            issues.append("无法读取limits.conf")
        # 检查内核参数
        ret_code, output, error = self._run_command(
            "sysctl fs.suid_dumpable 2>/dev/null | awk '{print $3}'"
        )
        if output == '1' or output == '2':
            issues.append(f"核心转储内核参数不安全: fs.suid_dumpable={output}")
        self._add_result(
            "core_dump",
            len(issues) == 0,
            "核心转储检查",
            issues if issues else "核心转储配置安全"
        )
    # ============== 生成报告 ==============
    def generate_report(self) -> None:
        """生成检查报告"""
        # 计算总体通过率
        total_checks = len(self.results["checks"])
        passed_checks = sum(1 for check in self.results["checks"] if check["passed"])
        self.results["summary"] = {
            "total_checks": total_checks,
            "passed_checks": passed_checks,
            "failed_checks": total_checks - passed_checks,
            "pass_rate": f"{(passed_checks/total_checks*100):.1f}%" if total_checks > 0 else "0%"
        }
        # 写入文件
        with open(self.report_file, 'w', encoding='utf-8') as f:
            json.dump(self.results, f, ensure_ascii=False, indent=2)
        print(f"\n{'='*60}")
        print(f"检查报告已保存到: {self.report_file}")
        print(f"总检查项: {total_checks}")
        print(f"通过: {passed_checks}")
        print(f"失败: {total_checks - passed_checks}")
        print(f"通过率: {self.results['summary']['pass_rate']}")
        print('='*60)
    def run_all_checks(self) -> None:
        """运行所有基线检查"""
        print(f"\n开始进行系统基线检查 - {datetime.datetime.now()}")
        print('='*60)
        print("\n1. 账户和密码策略检查")
        print("-" * 30)
        self.check_password_policy()
        self.check_empty_passwords()
        print("\n2. 文件系统权限检查")
        print("-" * 30)
        self.check_suid_sgid_files()
        self.check_world_writable_files()
        self.check_home_directory_permissions()
        print("\n3. 服务检查")
        print("-" * 30)
        self.check_unnecessary_services()
        self.check_ssh_configuration()
        print("\n4. 网络和安全检查")
        print("-" * 30)
        self.check_open_ports()
        self.check_firewall_status()
        print("\n5. 日志和审计检查")
        print("-" * 30)
        self.check_audit_logging()
        print("\n6. 系统配置检查")
        print("-" * 30)
        self.check_core_dump()
        self.generate_report()
# ============== 使用示例 ==============
def main():
    """主函数"""
    checker = BaselineChecker()
    print("="*60)
    print("系统安全基线检查工具 v1.0")
    print("="*60)
    # 运行所有检查
    checker.run_all_checks()
    # 或者单独运行特定的检查
    # checker.check_password_policy()
    # checker.check_ssh_configuration()
    # 生成自定义格式的报告
    print("\n生成SVG格式报告...")
    generate_svg_report(checker.results)
def generate_svg_report(results: Dict) -> None:
    """生成简单的SVG格式报告(示例)"""
    summary = results["summary"]
    svg_content = f"""<?xml version="1.0" encoding="UTF-8"?>
<svg xmlns="http://www.w3.org/2000/svg" width="800" height="600">
    <rect width="800" height="600" fill="#f0f0f0"/>
    <text x="400" y="50" text-anchor="middle" font-size="24" font-weight="bold">
        系统基线检查报告
    </text>
    <text x="400" y="80" text-anchor="middle" font-size="14">
        主机: {results['hostname']}
    </text>
    <text x="400" y="100" text-anchor="middle" font-size="14">
        时间: {results['timestamp']}
    </text>
    <!-- 统计信息 -->
    <rect x="50" y="130" width="700" height="150" rx="10" fill="white" stroke="#ccc"/>
    <text x="400" y="160" text-anchor="middle" font-size="16" font-weight="bold">
        检查结果统计
    </text>
    <text x="200" y="190" font-size="14">总检查项: {summary['total_checks']}</text>
    <text x="200" y="215" font-size="14" fill="green">通过: {summary['passed_checks']}</text>
    <text x="200" y="240" font-size="14" fill="red">失败: {summary['failed_checks']}</text>
    <text x="200" y="265" font-size="14">通过率: {summary['pass_rate']}</text>
    <!-- 通过率进度条 -->
    <rect x="400" y="190" width="300" height="20" rx="10" fill="#e0e0e0"/>
    <rect x="400" y="190" width="{300 * summary['passed_checks'] // max(summary['total_checks'], 1)}" 
          height="20" rx="10" fill="#4CAF50"/>
    <!-- 检查列表 -->
    <text x="50" y="320" font-size="16" font-weight="bold">检查项详情:</text>
"""
    y_pos = 345
    for check in results["checks"]:
        color = "green" if check["passed"] else "red"
        symbol = "✓" if check["passed"] else "✗"
        svg_content += f"""
    <text x="70" y="{y_pos}" font-size="12" fill="{color}">{symbol}</text>
    <text x="90" y="{y_pos}" font-size="12">{check['check_name']}: {check['description']}</text>
"""
        y_pos += 25
        # 如果超过SVG高度,停止
        if y_pos > 580:
            break
    svg_content += "\n</svg>"
    # 保存SVG文件
    svg_file = "baseline_report.svg"
    with open(svg_file, 'w', encoding='utf-8') as f:
        f.write(svg_content)
    print(f"SVG报告已保存到: {svg_file}")
if __name__ == "__main__":
    # 检查是否以root权限运行
    if os.geteuid() != 0:
        print("警告: 建议以root权限运行以获得完整的检查结果")
        response = input("是否继续? (y/n): ")
        if response.lower() != 'y':
            sys.exit(1)
    main()

使用说明

基本使用

# 运行所有基线检查
python3 baseline_checker.py
# 以root权限运行(推荐)
sudo python3 baseline_checker.py

自定义检查项

# 创建自定义检查器
checker = BaselineChecker()
# 只运行特定检查
checker.check_password_policy()
checker.check_ssh_configuration()
# 生成报告
checker.generate_report()

扩展检查项

class CustomChecker(BaselineChecker):
    """自定义检查器,添加新的检查项"""
    def check_custom_security(self) -> None:
        """自定义安全检查"""
        # 实现自定义检查逻辑
        pass

检查项说明

账户和密码策略

  • 密码最小长度
  • 密码最大使用期限
  • 空密码账户检测
  • shadow文件权限

文件系统权限

  • SUID/SGID文件
  • 全局可写文件
  • 用户主目录权限

服务配置

  • 不必要的服务
  • SSH安全配置
  • Root登录限制

网络安全

  • 端口开放情况
  • 防火墙状态
  • 高危端口检测

审计日志

  • 审计服务状态
  • 日志配置
  • 日志轮转

系统配置

  • 核心转储配置
  • 内核参数

输出说明

工具会生成两种格式的检查报告:

  1. JSON格式: 详细的结构化数据,便于程序处理
  2. SVG格式: 可视化报告,可直接展示

扩展建议

您可以根据需要添加更多检查项:

# 添加补丁管理检查
def check_patch_level(self):
    """检查系统补丁更新"""
    pass
# 添加用户权限检查
def check_user_permissions(self):
    """检查用户权限配置"""
    pass
# 添加文件完整性检查
def check_file_integrity(self):
    """检查关键文件完整性"""
    pass

这个基线检查工具是一个良好的起点,您可以根据组织的要求进行定制和扩展。

抱歉,评论功能暂时关闭!