实用脚本能批量监控吗?

wen 实用脚本 51

本文目录导读:

实用脚本能批量监控吗?

  1. 批量监控网站/API 是否在线
  2. 批量监控服务器资源(CPU、内存、磁盘)
  3. 批量监控日志异常
  4. 批量监控数据库连接池或慢查询
  5. 重要优化点(批量监控必须考虑的)
  6. 一个“开箱即用”的批量监控实用脚本框架

可以的。实用脚本完全可以实现批量监控,而且这是脚本语言的强项,无论是监控服务器状态、网站可用性、日志异常,还是资源消耗,脚本都能通过循环、并发或调度实现高效批量操作。

针对不同场景,我整理了几种实用脚本批量监控方案与示例:

批量监控网站/API 是否在线

场景:有100个业务URL,需要每隔5分钟检查一次,异常时告警。

Shell 脚本示例(单线程,简单可靠)

#!/bin/bash
URLS=("https://example1.com" "https://example2.com" "https://example3.com")
TIMEOUT=5
LOG_FILE="/tmp/web_monitor.log"
for url in "${URLS[@]}"; do
    HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" --max-time $TIMEOUT "$url")
    if [ "$HTTP_CODE" -ne 200 ]; then
        echo "$(date) [ERROR] $url 返回状态码: $HTTP_CODE" >> $LOG_FILE
        # 这里可以插入发送邮件、钉钉/企业微信机器人消息等
    else
        echo "$(date) [OK] $url 正常" >> $LOG_FILE
    fi
done

优化建议:对于大量URL,上述脚本是顺序执行的,100个URL大约耗时 100×5s=500秒,要提速,可以改用 xargs -P 并行:

cat url_list.txt | xargs -I {} -P 20 sh -c 'curl -s -o /dev/null -w "%{http_code}" --max-time 5 "$1 || echo "Failed"'

Python 脚本示例(功能更灵活,支持并发)

import requests
import concurrent.futures
import time
urls = ["https://site1.com", "https://site2.com", ...]
def check_url(url):
    try:
        r = requests.get(url, timeout=5)
        if r.status_code != 200:
            return f"{url} 异常,状态码: {r.status_code}"
        return f"{url} 正常"
    except Exception as e:
        return f"{url} 异常: {e}"
# 使用线程池并发监控,同时监控20个
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
    results = executor.map(check_url, urls)
    for result in results:
        print(result)
        # 写入日志或发送告警

批量监控服务器资源(CPU、内存、磁盘)

场景:你有50台Linux服务器,需要批量检查CPU使用率是否超过90%,内存是否不足。

SSH + Shell 批量执行

#!/bin/bash
SERVERS=("192.168.1.101" "192.168.1.102" "192.168.1.103")
USER="monitor"
KEY="/path/to/ssh_key"
THRESHOLD=90
for server in "${SERVERS[@]}"; do
    # 远程执行 top 或 free 命令,抓取CPU/内存值
    CPU_USAGE=$(ssh -i $KEY -o ConnectTimeout=5 $USER@$server "top -bn1 | grep 'Cpu(s)' | awk '{print \$2}' | cut -d'%' -f1")
    MEM_USAGE=$(ssh -i $KEY -o ConnectTimeout=5 $USER@$server "free | grep Mem | awk '{print \$3/\$2 * 100.0}'")
    if (( $(echo "$CPU_USAGE > $THRESHOLD" | bc -l) )); then
        echo "[WARN] $server CPU: ${CPU_USAGE}% 超过阈值" | tee -a /var/log/batch_monitor.log
    fi
done

Python + paramiko (或 fabric) 更稳定

import paramiko
import concurrent.futures
servers = [
    {"host": "192.168.1.101", "user": "monitor", "key": "/path/key"},
    # ...
]
def check_server(srv):
    ssh = paramiko.SSHClient()
    ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
    ssh.connect(srv["host"], username=srv["user"], key_filename=srv["key"], timeout=5)
    stdin, stdout, stderr = ssh.exec_command("top -bn1 | head -3")
    output = stdout.read().decode()
    # 解析输出逻辑...
    ssh.close()
    return f"{srv['host']} OK"
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
    executor.map(check_server, servers)

批量监控日志异常

场景:监控多个服务器、多个应用日志,一旦出现 ERRORException 立即告警。

实用脚本思路(多文件 + 持续跟踪)

#!/bin/bash
LOG_FILES=("/var/log/app1/error.log" "/var/log/app2/error.log" "/var/log/app3/error.log")
KEYWORDS=("ERROR" "CRITICAL" "FATAL" "Exception")
while true; do
    for log_file in "${LOG_FILES[@]}"; do
        for keyword in "${KEYWORDS[@]}"; do
            # 使用 tail -F 连续追踪,或者用 grep 每次检查新增行
            tail -n0 -F "$log_file" 2>/dev/null | while read line; do
                if echo "$line" | grep -q "$keyword"; then
                    echo "$(date) [ALERT] 在 $log_file 中发现 '$keyword': $line" >> /var/log/batch_alarms.log
                fi
            done &
        done
    done
    sleep 60
done

注意:while 循环中的 tail -F 会持续阻塞,实际生产建议使用 supervisorsystemd 将每个监控脚本作为独立服务管理。


批量监控数据库连接池或慢查询

  • 编写一个脚本,循环连接多个 MySQL/PostgreSQL 实例,执行 SHOW PROCESSLIST 或慢查询日志分析。
  • 使用 mysql -h host -u user -p'password' -e "..." 批量获取状态,结果解析后聚合展示。

重要优化点(批量监控必须考虑的)

问题 解决方案
执行太慢(顺序执行) 使用 xargs -PGNU parallel 或 Python 的 ThreadPoolExecutor / asyncio
大量监控项导致占用过多系统资源 限流:控制并发数(如 max_workers=20)、增加采样间隔
结果分散,难以归总 统一标准输出格式(JSON),或写入同一数据库(如 InfluxDB)、ELK
告警重复/刷屏 实现“告警抑制”(如相同错误5分钟内只发一次),可用状态文件或Redis存储时间戳
脚本异常退出 使用 set -etrap 捕获退出信号;生产环境建议配合 systemd Timer 或 crontab 重试

一个“开箱即用”的批量监控实用脚本框架

如果你希望一个脚本搞定“批量+并发+日志+简单告警”,可以参考这个Python小框架:

import time, logging, concurrent.futures, requests
# 配置
TARGETS = ["https://site1.com", "https://site2.com", ...]  # 可从文件读取
CHECK_INTERVAL = 60   # 秒
CONCURRENCY = 20
ALARM_WEBHOOK = "https://hooks.example.com/alert"
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
def check_one(item):
    url = item if isinstance(item, str) else item.get("url")
    try:
        resp = requests.get(url, timeout=10)
        if resp.status_code != 200:
            raise Exception(f"HTTP {resp.status_code}")
        return {"status": "ok", "url": url}
    except Exception as e:
        return {"status": "fail", "url": url, "error": str(e)}
def send_alarm(msg):
    # 发送到钉钉、企业微信、Slack等
    requests.post(ALARM_WEBHOOK, json={"msg": msg})
def main():
    while True:
        with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
            results = executor.map(check_one, TARGETS)
            failures = [r for r in results if r["status"] == "fail"]
            if failures:
                msg = f"批量监控告警:{len(failures)} 个目标异常\n" + "\n".join([f['url'] for f in failures])
                logging.error(msg)
                send_alarm(msg)
            else:
                logging.info("全部监控目标正常")
        time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
    main()

实用脚本不仅能批量监控,而且非常适合快速搭建、灵活定制、轻量部署
如果你需要监控上千个目标,考虑引入 Prometheus + Blackbox ExporterNagios/Zabbix,但对于几十到几百个节点的日常监控,一个精心编写的脚本配合 cron 或 systemd timer 完全够用

需要针对特定场景(如 Windows 服务器、Docker 容器、Kubernetes Pod 等)的监控脚本思路,可以继续告诉我。

抱歉,评论功能暂时关闭!