如何用实用脚本自动检查网站是否宕机?

wen 实用脚本 4

本文目录导读:

如何用实用脚本自动检查网站是否宕机?

  1. 最基础的Shell脚本(单网站检查)
  2. Python脚本(多站点并发检测)
  3. 配合Docker的监控脚本
  4. 高级监控脚本(带Webhook通知)
  5. 快速使用指南

我来分享几个实用的网站宕机监控脚本,从简单到高级都有。

最基础的Shell脚本(单网站检查)

#!/bin/bash
# 网站URL列表
URLS=("https://example.com" "https://google.com" "https://github.com")
# 告警邮箱(可选)
ALERT_EMAIL="[email protected]"
# 检查函数
check_site() {
    local url=$1
    local status_code=$(curl -s -o /dev/null -w "%{http_code}" --connect-timeout 10 --max-time 30 "$url")
    if [[ "$status_code" == "200" ]] || [[ "$status_code" == "301" ]] || [[ "$status_code" == "302" ]]; then
        echo "$(date '+%Y-%m-%d %H:%M:%S') - $url - UP ($status_code)"
        return 0
    else
        echo "$(date '+%Y-%m-%d %H:%M:%S') - $url - DOWN ($status_code)"
        return 1
    fi
}
# 主循环
for url in "${URLS[@]}"; do
    if ! check_site "$url"; then
        # 可以添加告警逻辑
        echo "Alert: $url is down!" | mail -s "网站宕机告警" "$ALERT_EMAIL"
    fi
done

Python脚本(多站点并发检测)

#!/usr/bin/env python3
import requests
import time
import threading
import smtplib
from email.mime.text import MIMEText
import json
from datetime import datetime
class SiteMonitor:
    def __init__(self, config_file='sites.json'):
        self.sites = self.load_config(config_file)
        self.results = {}
    def load_config(self, config_file):
        """从JSON文件加载配置"""
        default_config = {
            "sites": [
                {
                    "url": "https://example.com",
                    "name": "Example Site",
                    "check_interval": 60,
                    "timeout": 10
                },
                {
                    "url": "https://google.com",
                    "name": "Google",
                    "check_interval": 120,
                    "timeout": 10
                }
            ],
            "alert_email": "[email protected]",
            "alert_threshold": 2  # 连续失败次数
        }
        try:
            with open(config_file, 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            print(f"配置文件 {config_file} 不存在,使用默认配置")
            return default_config
    def check_site(self, site):
        """检查单个站点"""
        start_time = time.time()
        try:
            response = requests.get(
                site['url'],
                timeout=site.get('timeout', 10),
                headers={
                    'User-Agent': 'Mozilla/5.0 (compatible; SiteMonitor/1.0)'
                }
            )
            status = {
                'url': site['url'],
                'name': site['name'],
                'status_code': response.status_code,
                'response_time': round(time.time() - start_time, 2),
                'timestamp': datetime.now().isoformat(),
                'is_up': response.status_code < 400
            }
            # 检查响应内容是否包含关键信息
            if 'error' in response.text.lower()[:1000]:
                status['is_up'] = False
                status['error'] = 'Error keyword found in response'
        except requests.exceptions.Timeout:
            status = {
                'url': site['url'],
                'name': site['name'],
                'status_code': 0,
                'response_time': site.get('timeout', 10),
                'timestamp': datetime.now().isoformat(),
                'is_up': False,
                'error': 'Timeout'
            }
        except requests.exceptions.ConnectionError:
            status = {
                'url': site['url'],
                'name': site['name'],
                'status_code': 0,
                'response_time': 0,
                'timestamp': datetime.now().isoformat(),
                'is_up': False,
                'error': 'Connection failed'
            }
        except Exception as e:
            status = {
                'url': site['url'],
                'name': site['name'],
                'status_code': -1,
                'response_time': 0,
                'timestamp': datetime.now().isoformat(),
                'is_up': False,
                'error': str(e)
            }
        return status
    def send_alert(self, site_status):
        """发送告警通知"""
        email_config = self.sites.get('alert_email', '')
        if not email_config:
            return
        msg = MIMEText(f"""
网站宕机告警
时间: {site_status['timestamp']}
名称: {site_status['name']}
URL: {site_status['url']}
状态码: {site_status['status_code']}
错误: {site_status.get('error', 'Unknown')}
请立即处理!
        """)
        msg['Subject'] = f"[ALERT] {site_status['name']} 宕机"
        msg['From'] = email_config
        msg['To'] = email_config
        try:
            with smtplib.SMTP('localhost') as server:
                server.send_message(msg)
            print(f"告警邮件已发送至 {email_config}")
        except Exception as e:
            print(f"发送告警邮件失败: {e}")
    def monitor(self):
        """开始监控"""
        print("=== 网站监控系统启动 ===")
        print(f"监控 {len(self.sites['sites'])} 个站点...")
        while True:
            threads = []
            for site in self.sites['sites']:
                thread = threading.Thread(
                    target=self.check_site_thread,
                    args=(site,)
                )
                thread.start()
                threads.append(thread)
            # 等待所有线程完成
            for thread in threads:
                thread.join()
            # 打印摘要
            self.print_summary()
            # 等待下一次检查
            time.sleep(min(site.get('check_interval', 60) 
                         for site in self.sites['sites']))
    def check_site_thread(self, site):
        """线程包装的检查函数"""
        status = self.check_site(site)
        # 跟踪连续失败次数
        if site['url'] not in self.failure_counts:
            self.failure_counts[site['url']] = 0
        if not status['is_up']:
            self.failure_counts[site['url']] += 1
            if self.failure_counts[site['url']] >= self.sites.get('alert_threshold', 2):
                self.send_alert(status)
        else:
            self.failure_counts[site['url']] = 0
        self.results[site['url']] = status
        self.log_status(status)
    def log_status(self, status):
        """记录状态到日志文件"""
        log_file = f"monitor_{datetime.now().strftime('%Y%m')}.log"
        with open(log_file, 'a') as f:
            log_line = f"{status['timestamp']}|{status['name']}|{status['url']}|UP:{status['is_up']}|Code:{status['status_code']}|Time:{status['response_time']}s\n"
            f.write(log_line)
    def print_summary(self):
        """打印状态摘要"""
        print(f"\n=== {datetime.now().strftime('%H:%M:%S')} 状态摘要 ===")
        for url, status in self.results.items():
            emoji = "✅" if status['is_up'] else "❌"
            print(f"{emoji} {status['name']}: {'UP' if status['is_up'] else 'DOWN'} "
                  f"(Code: {status['status_code']}, Time: {status['response_time']}s)")
if __name__ == "__main__":
    monitor = SiteMonitor()
    # 初始化失败计数
    monitor.failure_counts = {}
    try:
        monitor.monitor()
    except KeyboardInterrupt:
        print("\n监控停止")

配合Docker的监控脚本

# Dockerfile
FROM python:3.9-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY monitor.py .
COPY sites.json .
CMD ["python", "monitor.py"]
# requirements.txt
requests>=2.25.0

高级监控脚本(带Webhook通知)

#!/usr/bin/env python3
import requests
import time
import json
from datetime import datetime
class AdvancedMonitor:
    def __init__(self, config_file='monitor_config.json'):
        self.config = self.load_config(config_file)
    def load_config(self, config_file):
        config = {
            "sites": [
                {
                    "url": "https://example.com",
                    "name": "Example",
                    "expected_status": [200, 301, 302],
                    "check_interval": 60,
                    "keywords": ["success", "welcome"]  # 可选:检查关键词
                }
            ],
            "notifications": {
                "webhook": {
                    "enabled": True,
                    "url": "https://hooks.slack.com/services/xxx/yyy/zzz",
                    "method": "POST"
                },
                "telegram": {
                    "enabled": False,
                    "bot_token": "YOUR_BOT_TOKEN",
                    "chat_id": "YOUR_CHAT_ID"
                }
            },
            "advanced": {
                "max_response_time": 5,  # 秒
                "retry_count": 3,
                "down_threshold": 3  # 连续失败次数触发告警
            }
        }
        return config
    def check_with_retry(self, site):
        """带重试的检查"""
        max_retries = self.config['advanced']['retry_count']
        for attempt in range(max_retries):
            try:
                start = time.time()
                response = requests.get(
                    site['url'],
                    timeout=10,
                    verify=True,
                    allow_redirects=True
                )
                response_time = time.time() - start
                # 检查状态码
                status_ok = response.status_code in site.get('expected_status', [200])
                # 检查关键词
                keywords = site.get('keywords', [])
                keywords_ok = all(kw in response.text for kw in keywords) if keywords else True
                # 检查响应时间
                time_ok = response_time <= self.config['advanced']['max_response_time']
                if status_ok and keywords_ok and time_ok:
                    return {
                        'status': 'up',
                        'status_code': response.status_code,
                        'response_time': response_time,
                        'attempt': attempt + 1
                    }
                elif not status_ok:
                    return {
                        'status': 'error',
                        'message': f'Unexpected status code: {response.status_code}',
                        'status_code': response.status_code,
                        'response_time': response_time,
                        'attempt': attempt + 1
                    }
                elif not keywords_ok:
                    return {
                        'status': 'error',
                        'message': 'Required keywords not found',
                        'status_code': response.status_code,
                        'response_time': response_time,
                        'attempt': attempt + 1
                    }
            except Exception as e:
                if attempt < max_retries - 1:
                    time.sleep(2)  # 重试前等待
                    continue
                return {
                    'status': 'down',
                    'message': str(e),
                    'attempt': attempt + 1
                }
    def send_telegram_alert(self, message):
        """发送Telegram通知"""
        if not self.config['notifications']['telegram']['enabled']:
            return
        bot_token = self.config['notifications']['telegram']['bot_token']
        chat_id = self.config['notifications']['telegram']['chat_id']
        url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
        data = {
            "chat_id": chat_id,
            "text": message,
            "parse_mode": "HTML"
        }
        try:
            requests.post(url, data=data, timeout=10)
        except Exception as e:
            print(f"Telegram通知失败: {e}")
    def send_webhook_alert(self, message):
        """发送Webhook通知"""
        if not self.config['notifications']['webhook']['enabled']:
            return
        webhook_url = self.config['notifications']['webhook']['url']
        payload = {
            "text": message,
            "timestamp": datetime.now().isoformat()
        }
        try:
            requests.post(webhook_url, json=payload, timeout=10)
        except Exception as e:
            print(f"Webhook通知失败: {e}")
    def monitor_loop(self):
        """主监控循环"""
        print("🚀 高级监控系统启动")
        failure_counts = {site['url']: 0 for site in self.config['sites']}
        last_checks = {}
        while True:
            for site in self.config['sites']:
                result = self.check_with_retry(site)
                timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
                if result['status'] in ['down', 'error']:
                    failure_counts[site['url']] += 1
                    if failure_counts[site['url']] >= self.config['advanced']['down_threshold']:
                        message = f"""
⚠️ <b>网站宕机告警</b> ⚠️
站点: {site['name']}
URL: {site['url']}
时间: {timestamp}
状态: {result['status']}
错误: {result.get('message', 'N/A')}
尝试次数: {result['attempt']}
                        """
                        self.send_webhook_alert(message)
                        self.send_telegram_alert(message)
                        # 重置计数器(避免重复告警)
                        failure_counts[site['url']] = 0
                else:
                    failure_counts[site['url']] = 0
                    status_str = f"✅ {site['name']}: UP ({result['response_time']:.2f}s)"
                    # 状态恢复通知
                    if last_checks.get(site['url'], {}).get('status') in ['down', 'error']:
                        message = f"""
🟢 <b>网站已恢复</b> 🟢
站点: {site['name']}
URL: {site['url']}
时间: {timestamp}
响应时间: {result['response_time']:.2f}s
                        """
                        self.send_webhook_alert(message)
                last_checks[site['url']] = result
                print(f"[{timestamp}] {status_str if result['status'] == 'up' else f'❌ {site[\"name\"]}: {result[\"status\"]}'}")
            # 计算最短检查间隔
            min_interval = min(site.get('check_interval', 60) 
                             for site in self.config['sites'])
            time.sleep(min_interval)
if __name__ == "__main__":
    monitor = AdvancedMonitor()
    monitor.monitor_loop()

快速使用指南

基础脚本使用

# 保存脚本并赋予执行权限
chmod +x monitor.sh
# 添加到crontab,每分钟检查一次
crontab -e
* * * * * /path/to/monitor.sh >> /var/log/site_monitor.log 2>&1

Python脚本使用

# 安装依赖
pip install requests
# 创建配置文件
cat > sites.json << EOF
{
    "sites": [
        {"url": "https://your-site.com", "name": "Your Site", "check_interval": 60}
    ]
}
EOF
# 运行脚本
python3 monitor.py

系统服务化

# /etc/systemd/system/site-monitor.service
[Unit]
Description=Website Monitor Service
After=network.target
[Service]
Type=simple
User=nobody
WorkingDirectory=/opt/monitor
ExecStart=/usr/bin/python3 /opt/monitor/monitor.py
Restart=always
RestartSec=10
[Install]
WantedBy=multi-user.target
# 启用服务
systemctl enable site-monitor
systemctl start site-monitor

这些脚本可以根据你的需求进行修改和扩展,建议同时配合监控服务(如Uptime Robot、StatusCake等)使用,以获得更可靠的多区域监控。

抱歉,评论功能暂时关闭!