本文目录导读:

可以的。实用脚本完全可以实现批量监控,而且这是脚本语言的强项,无论是监控服务器状态、网站可用性、日志异常,还是资源消耗,脚本都能通过循环、并发或调度实现高效批量操作。
针对不同场景,我整理了几种实用脚本批量监控方案与示例:
批量监控网站/API 是否在线
场景:有100个业务URL,需要每隔5分钟检查一次,异常时告警。
Shell 脚本示例(单线程,简单可靠)
#!/bin/bash
URLS=("https://example1.com" "https://example2.com" "https://example3.com")
TIMEOUT=5
LOG_FILE="/tmp/web_monitor.log"
for url in "${URLS[@]}"; do
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" --max-time $TIMEOUT "$url")
if [ "$HTTP_CODE" -ne 200 ]; then
echo "$(date) [ERROR] $url 返回状态码: $HTTP_CODE" >> $LOG_FILE
# 这里可以插入发送邮件、钉钉/企业微信机器人消息等
else
echo "$(date) [OK] $url 正常" >> $LOG_FILE
fi
done
优化建议:对于大量URL,上述脚本是顺序执行的,100个URL大约耗时 100×5s=500秒,要提速,可以改用 xargs -P 并行:
cat url_list.txt | xargs -I {} -P 20 sh -c 'curl -s -o /dev/null -w "%{http_code}" --max-time 5 "$1 || echo "Failed"'
Python 脚本示例(功能更灵活,支持并发)
import requests
import concurrent.futures
import time
urls = ["https://site1.com", "https://site2.com", ...]
def check_url(url):
try:
r = requests.get(url, timeout=5)
if r.status_code != 200:
return f"{url} 异常,状态码: {r.status_code}"
return f"{url} 正常"
except Exception as e:
return f"{url} 异常: {e}"
# 使用线程池并发监控,同时监控20个
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
results = executor.map(check_url, urls)
for result in results:
print(result)
# 写入日志或发送告警
批量监控服务器资源(CPU、内存、磁盘)
场景:你有50台Linux服务器,需要批量检查CPU使用率是否超过90%,内存是否不足。
SSH + Shell 批量执行
#!/bin/bash
SERVERS=("192.168.1.101" "192.168.1.102" "192.168.1.103")
USER="monitor"
KEY="/path/to/ssh_key"
THRESHOLD=90
for server in "${SERVERS[@]}"; do
# 远程执行 top 或 free 命令,抓取CPU/内存值
CPU_USAGE=$(ssh -i $KEY -o ConnectTimeout=5 $USER@$server "top -bn1 | grep 'Cpu(s)' | awk '{print \$2}' | cut -d'%' -f1")
MEM_USAGE=$(ssh -i $KEY -o ConnectTimeout=5 $USER@$server "free | grep Mem | awk '{print \$3/\$2 * 100.0}'")
if (( $(echo "$CPU_USAGE > $THRESHOLD" | bc -l) )); then
echo "[WARN] $server CPU: ${CPU_USAGE}% 超过阈值" | tee -a /var/log/batch_monitor.log
fi
done
Python + paramiko (或 fabric) 更稳定
import paramiko
import concurrent.futures
servers = [
{"host": "192.168.1.101", "user": "monitor", "key": "/path/key"},
# ...
]
def check_server(srv):
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(srv["host"], username=srv["user"], key_filename=srv["key"], timeout=5)
stdin, stdout, stderr = ssh.exec_command("top -bn1 | head -3")
output = stdout.read().decode()
# 解析输出逻辑...
ssh.close()
return f"{srv['host']} OK"
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
executor.map(check_server, servers)
批量监控日志异常
场景:监控多个服务器、多个应用日志,一旦出现 ERROR 或 Exception 立即告警。
实用脚本思路(多文件 + 持续跟踪)
#!/bin/bash
LOG_FILES=("/var/log/app1/error.log" "/var/log/app2/error.log" "/var/log/app3/error.log")
KEYWORDS=("ERROR" "CRITICAL" "FATAL" "Exception")
while true; do
for log_file in "${LOG_FILES[@]}"; do
for keyword in "${KEYWORDS[@]}"; do
# 使用 tail -F 连续追踪,或者用 grep 每次检查新增行
tail -n0 -F "$log_file" 2>/dev/null | while read line; do
if echo "$line" | grep -q "$keyword"; then
echo "$(date) [ALERT] 在 $log_file 中发现 '$keyword': $line" >> /var/log/batch_alarms.log
fi
done &
done
done
sleep 60
done
注意:
while循环中的tail -F会持续阻塞,实际生产建议使用 supervisor 或 systemd 将每个监控脚本作为独立服务管理。
批量监控数据库连接池或慢查询
- 编写一个脚本,循环连接多个 MySQL/PostgreSQL 实例,执行
SHOW PROCESSLIST或慢查询日志分析。 - 使用
mysql -h host -u user -p'password' -e "..."批量获取状态,结果解析后聚合展示。
重要优化点(批量监控必须考虑的)
| 问题 | 解决方案 |
|---|---|
| 执行太慢(顺序执行) | 使用 xargs -P、GNU parallel 或 Python 的 ThreadPoolExecutor / asyncio |
| 大量监控项导致占用过多系统资源 | 限流:控制并发数(如 max_workers=20)、增加采样间隔 |
| 结果分散,难以归总 | 统一标准输出格式(JSON),或写入同一数据库(如 InfluxDB)、ELK |
| 告警重复/刷屏 | 实现“告警抑制”(如相同错误5分钟内只发一次),可用状态文件或Redis存储时间戳 |
| 脚本异常退出 | 使用 set -e、trap 捕获退出信号;生产环境建议配合 systemd Timer 或 crontab 重试 |
一个“开箱即用”的批量监控实用脚本框架
如果你希望一个脚本搞定“批量+并发+日志+简单告警”,可以参考这个Python小框架:
import time, logging, concurrent.futures, requests
# 配置
TARGETS = ["https://site1.com", "https://site2.com", ...] # 可从文件读取
CHECK_INTERVAL = 60 # 秒
CONCURRENCY = 20
ALARM_WEBHOOK = "https://hooks.example.com/alert"
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
def check_one(item):
url = item if isinstance(item, str) else item.get("url")
try:
resp = requests.get(url, timeout=10)
if resp.status_code != 200:
raise Exception(f"HTTP {resp.status_code}")
return {"status": "ok", "url": url}
except Exception as e:
return {"status": "fail", "url": url, "error": str(e)}
def send_alarm(msg):
# 发送到钉钉、企业微信、Slack等
requests.post(ALARM_WEBHOOK, json={"msg": msg})
def main():
while True:
with concurrent.futures.ThreadPoolExecutor(max_workers=CONCURRENCY) as executor:
results = executor.map(check_one, TARGETS)
failures = [r for r in results if r["status"] == "fail"]
if failures:
msg = f"批量监控告警:{len(failures)} 个目标异常\n" + "\n".join([f['url'] for f in failures])
logging.error(msg)
send_alarm(msg)
else:
logging.info("全部监控目标正常")
time.sleep(CHECK_INTERVAL)
if __name__ == "__main__":
main()
实用脚本不仅能批量监控,而且非常适合快速搭建、灵活定制、轻量部署。
如果你需要监控上千个目标,考虑引入 Prometheus + Blackbox Exporter 或 Nagios/Zabbix,但对于几十到几百个节点的日常监控,一个精心编写的脚本配合 cron 或 systemd timer 完全够用。
需要针对特定场景(如 Windows 服务器、Docker 容器、Kubernetes Pod 等)的监控脚本思路,可以继续告诉我。