本文目录导读:

我来分享几个实用的网站宕机监控脚本,从简单到高级都有。
最基础的Shell脚本(单网站检查)
#!/bin/bash
# 网站URL列表
URLS=("https://example.com" "https://google.com" "https://github.com")
# 告警邮箱(可选)
ALERT_EMAIL="[email protected]"
# 检查函数
check_site() {
local url=$1
local status_code=$(curl -s -o /dev/null -w "%{http_code}" --connect-timeout 10 --max-time 30 "$url")
if [[ "$status_code" == "200" ]] || [[ "$status_code" == "301" ]] || [[ "$status_code" == "302" ]]; then
echo "$(date '+%Y-%m-%d %H:%M:%S') - $url - UP ($status_code)"
return 0
else
echo "$(date '+%Y-%m-%d %H:%M:%S') - $url - DOWN ($status_code)"
return 1
fi
}
# 主循环
for url in "${URLS[@]}"; do
if ! check_site "$url"; then
# 可以添加告警逻辑
echo "Alert: $url is down!" | mail -s "网站宕机告警" "$ALERT_EMAIL"
fi
done
Python脚本(多站点并发检测)
#!/usr/bin/env python3
import requests
import time
import threading
import smtplib
from email.mime.text import MIMEText
import json
from datetime import datetime
class SiteMonitor:
def __init__(self, config_file='sites.json'):
self.sites = self.load_config(config_file)
self.results = {}
def load_config(self, config_file):
"""从JSON文件加载配置"""
default_config = {
"sites": [
{
"url": "https://example.com",
"name": "Example Site",
"check_interval": 60,
"timeout": 10
},
{
"url": "https://google.com",
"name": "Google",
"check_interval": 120,
"timeout": 10
}
],
"alert_email": "[email protected]",
"alert_threshold": 2 # 连续失败次数
}
try:
with open(config_file, 'r') as f:
return json.load(f)
except FileNotFoundError:
print(f"配置文件 {config_file} 不存在,使用默认配置")
return default_config
def check_site(self, site):
"""检查单个站点"""
start_time = time.time()
try:
response = requests.get(
site['url'],
timeout=site.get('timeout', 10),
headers={
'User-Agent': 'Mozilla/5.0 (compatible; SiteMonitor/1.0)'
}
)
status = {
'url': site['url'],
'name': site['name'],
'status_code': response.status_code,
'response_time': round(time.time() - start_time, 2),
'timestamp': datetime.now().isoformat(),
'is_up': response.status_code < 400
}
# 检查响应内容是否包含关键信息
if 'error' in response.text.lower()[:1000]:
status['is_up'] = False
status['error'] = 'Error keyword found in response'
except requests.exceptions.Timeout:
status = {
'url': site['url'],
'name': site['name'],
'status_code': 0,
'response_time': site.get('timeout', 10),
'timestamp': datetime.now().isoformat(),
'is_up': False,
'error': 'Timeout'
}
except requests.exceptions.ConnectionError:
status = {
'url': site['url'],
'name': site['name'],
'status_code': 0,
'response_time': 0,
'timestamp': datetime.now().isoformat(),
'is_up': False,
'error': 'Connection failed'
}
except Exception as e:
status = {
'url': site['url'],
'name': site['name'],
'status_code': -1,
'response_time': 0,
'timestamp': datetime.now().isoformat(),
'is_up': False,
'error': str(e)
}
return status
def send_alert(self, site_status):
"""发送告警通知"""
email_config = self.sites.get('alert_email', '')
if not email_config:
return
msg = MIMEText(f"""
网站宕机告警
时间: {site_status['timestamp']}
名称: {site_status['name']}
URL: {site_status['url']}
状态码: {site_status['status_code']}
错误: {site_status.get('error', 'Unknown')}
请立即处理!
""")
msg['Subject'] = f"[ALERT] {site_status['name']} 宕机"
msg['From'] = email_config
msg['To'] = email_config
try:
with smtplib.SMTP('localhost') as server:
server.send_message(msg)
print(f"告警邮件已发送至 {email_config}")
except Exception as e:
print(f"发送告警邮件失败: {e}")
def monitor(self):
"""开始监控"""
print("=== 网站监控系统启动 ===")
print(f"监控 {len(self.sites['sites'])} 个站点...")
while True:
threads = []
for site in self.sites['sites']:
thread = threading.Thread(
target=self.check_site_thread,
args=(site,)
)
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
# 打印摘要
self.print_summary()
# 等待下一次检查
time.sleep(min(site.get('check_interval', 60)
for site in self.sites['sites']))
def check_site_thread(self, site):
"""线程包装的检查函数"""
status = self.check_site(site)
# 跟踪连续失败次数
if site['url'] not in self.failure_counts:
self.failure_counts[site['url']] = 0
if not status['is_up']:
self.failure_counts[site['url']] += 1
if self.failure_counts[site['url']] >= self.sites.get('alert_threshold', 2):
self.send_alert(status)
else:
self.failure_counts[site['url']] = 0
self.results[site['url']] = status
self.log_status(status)
def log_status(self, status):
"""记录状态到日志文件"""
log_file = f"monitor_{datetime.now().strftime('%Y%m')}.log"
with open(log_file, 'a') as f:
log_line = f"{status['timestamp']}|{status['name']}|{status['url']}|UP:{status['is_up']}|Code:{status['status_code']}|Time:{status['response_time']}s\n"
f.write(log_line)
def print_summary(self):
"""打印状态摘要"""
print(f"\n=== {datetime.now().strftime('%H:%M:%S')} 状态摘要 ===")
for url, status in self.results.items():
emoji = "✅" if status['is_up'] else "❌"
print(f"{emoji} {status['name']}: {'UP' if status['is_up'] else 'DOWN'} "
f"(Code: {status['status_code']}, Time: {status['response_time']}s)")
if __name__ == "__main__":
monitor = SiteMonitor()
# 初始化失败计数
monitor.failure_counts = {}
try:
monitor.monitor()
except KeyboardInterrupt:
print("\n监控停止")
配合Docker的监控脚本
# Dockerfile FROM python:3.9-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY monitor.py . COPY sites.json . CMD ["python", "monitor.py"]
# requirements.txt requests>=2.25.0
高级监控脚本(带Webhook通知)
#!/usr/bin/env python3
import requests
import time
import json
from datetime import datetime
class AdvancedMonitor:
def __init__(self, config_file='monitor_config.json'):
self.config = self.load_config(config_file)
def load_config(self, config_file):
config = {
"sites": [
{
"url": "https://example.com",
"name": "Example",
"expected_status": [200, 301, 302],
"check_interval": 60,
"keywords": ["success", "welcome"] # 可选:检查关键词
}
],
"notifications": {
"webhook": {
"enabled": True,
"url": "https://hooks.slack.com/services/xxx/yyy/zzz",
"method": "POST"
},
"telegram": {
"enabled": False,
"bot_token": "YOUR_BOT_TOKEN",
"chat_id": "YOUR_CHAT_ID"
}
},
"advanced": {
"max_response_time": 5, # 秒
"retry_count": 3,
"down_threshold": 3 # 连续失败次数触发告警
}
}
return config
def check_with_retry(self, site):
"""带重试的检查"""
max_retries = self.config['advanced']['retry_count']
for attempt in range(max_retries):
try:
start = time.time()
response = requests.get(
site['url'],
timeout=10,
verify=True,
allow_redirects=True
)
response_time = time.time() - start
# 检查状态码
status_ok = response.status_code in site.get('expected_status', [200])
# 检查关键词
keywords = site.get('keywords', [])
keywords_ok = all(kw in response.text for kw in keywords) if keywords else True
# 检查响应时间
time_ok = response_time <= self.config['advanced']['max_response_time']
if status_ok and keywords_ok and time_ok:
return {
'status': 'up',
'status_code': response.status_code,
'response_time': response_time,
'attempt': attempt + 1
}
elif not status_ok:
return {
'status': 'error',
'message': f'Unexpected status code: {response.status_code}',
'status_code': response.status_code,
'response_time': response_time,
'attempt': attempt + 1
}
elif not keywords_ok:
return {
'status': 'error',
'message': 'Required keywords not found',
'status_code': response.status_code,
'response_time': response_time,
'attempt': attempt + 1
}
except Exception as e:
if attempt < max_retries - 1:
time.sleep(2) # 重试前等待
continue
return {
'status': 'down',
'message': str(e),
'attempt': attempt + 1
}
def send_telegram_alert(self, message):
"""发送Telegram通知"""
if not self.config['notifications']['telegram']['enabled']:
return
bot_token = self.config['notifications']['telegram']['bot_token']
chat_id = self.config['notifications']['telegram']['chat_id']
url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
data = {
"chat_id": chat_id,
"text": message,
"parse_mode": "HTML"
}
try:
requests.post(url, data=data, timeout=10)
except Exception as e:
print(f"Telegram通知失败: {e}")
def send_webhook_alert(self, message):
"""发送Webhook通知"""
if not self.config['notifications']['webhook']['enabled']:
return
webhook_url = self.config['notifications']['webhook']['url']
payload = {
"text": message,
"timestamp": datetime.now().isoformat()
}
try:
requests.post(webhook_url, json=payload, timeout=10)
except Exception as e:
print(f"Webhook通知失败: {e}")
def monitor_loop(self):
"""主监控循环"""
print("🚀 高级监控系统启动")
failure_counts = {site['url']: 0 for site in self.config['sites']}
last_checks = {}
while True:
for site in self.config['sites']:
result = self.check_with_retry(site)
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if result['status'] in ['down', 'error']:
failure_counts[site['url']] += 1
if failure_counts[site['url']] >= self.config['advanced']['down_threshold']:
message = f"""
⚠️ <b>网站宕机告警</b> ⚠️
站点: {site['name']}
URL: {site['url']}
时间: {timestamp}
状态: {result['status']}
错误: {result.get('message', 'N/A')}
尝试次数: {result['attempt']}
"""
self.send_webhook_alert(message)
self.send_telegram_alert(message)
# 重置计数器(避免重复告警)
failure_counts[site['url']] = 0
else:
failure_counts[site['url']] = 0
status_str = f"✅ {site['name']}: UP ({result['response_time']:.2f}s)"
# 状态恢复通知
if last_checks.get(site['url'], {}).get('status') in ['down', 'error']:
message = f"""
🟢 <b>网站已恢复</b> 🟢
站点: {site['name']}
URL: {site['url']}
时间: {timestamp}
响应时间: {result['response_time']:.2f}s
"""
self.send_webhook_alert(message)
last_checks[site['url']] = result
print(f"[{timestamp}] {status_str if result['status'] == 'up' else f'❌ {site[\"name\"]}: {result[\"status\"]}'}")
# 计算最短检查间隔
min_interval = min(site.get('check_interval', 60)
for site in self.config['sites'])
time.sleep(min_interval)
if __name__ == "__main__":
monitor = AdvancedMonitor()
monitor.monitor_loop()
快速使用指南
基础脚本使用
# 保存脚本并赋予执行权限 chmod +x monitor.sh # 添加到crontab,每分钟检查一次 crontab -e * * * * * /path/to/monitor.sh >> /var/log/site_monitor.log 2>&1
Python脚本使用
# 安装依赖
pip install requests
# 创建配置文件
cat > sites.json << EOF
{
"sites": [
{"url": "https://your-site.com", "name": "Your Site", "check_interval": 60}
]
}
EOF
# 运行脚本
python3 monitor.py
系统服务化
# /etc/systemd/system/site-monitor.service [Unit] Description=Website Monitor Service After=network.target [Service] Type=simple User=nobody WorkingDirectory=/opt/monitor ExecStart=/usr/bin/python3 /opt/monitor/monitor.py Restart=always RestartSec=10 [Install] WantedBy=multi-user.target
# 启用服务 systemctl enable site-monitor systemctl start site-monitor
这些脚本可以根据你的需求进行修改和扩展,建议同时配合监控服务(如Uptime Robot、StatusCake等)使用,以获得更可靠的多区域监控。