错误报警脚本怎么做?

wen 实用脚本 61

本文目录导读:

错误报警脚本怎么做?

  1. Shell脚本 - 系统监控报警
  2. Python脚本 - 应用错误报警
  3. Go脚本 - 高性能监控报警
  4. 使用建议

我来帮你创建几种常见的错误报警脚本实现方案。

Shell脚本 - 系统监控报警

#!/bin/bash
# 错误报警脚本 - 监控系统资源
# 配置
ALERT_EMAIL="admin@example.com"
CPU_THRESHOLD=90
MEMORY_THRESHOLD=90
DISK_THRESHOLD=90
# 日志函数
log() {
    echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >> /var/log/alert.log
}
# 发送报警
send_alert() {
    local subject="$1"
    local body="$2"
    # 发送邮件
    echo "$body" | mail -s "$subject" "$ALERT_EMAIL"
    # 或者发送到钉钉/企业微信(示例)
    # curl -X POST "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN" \
    #      -H "Content-Type: application/json" \
    #      -d "{\"msgtype\":\"text\",\"text\":{\"content\":\"$body\"}}"
    log "Alert sent: $subject"
}
# CPU监控
check_cpu() {
    local cpu_usage=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1)
    if (( $(echo "$cpu_usage > $CPU_THRESHOLD" | bc -l) )); then
        send_alert "CPU超限报警" "CPU使用率: ${cpu_usage}% (阈值: ${CPU_THRESHOLD}%)"
    fi
}
# 内存监控
check_memory() {
    local mem_usage=$(free | grep Mem | awk '{print $3/$2 * 100.0}')
    if (( $(echo "$mem_usage > $MEMORY_THRESHOLD" | bc -l) )); then
        send_alert "内存超限报警" "内存使用率: ${mem_usage}% (阈值: ${MEMORY_THRESHOLD}%)"
    fi
}
# 磁盘监控
check_disk() {
    local disk_usage=$(df -h / | awk 'NR==2 {print $5}' | cut -d'%' -f1)
    if [ "$disk_usage" -gt "$DISK_THRESHOLD" ]; then
        send_alert "磁盘空间报警" "根分区使用率: ${disk_usage}% (阈值: ${DISK_THRESHOLD}%)"
    fi
}
# 主函数
main() {
    check_cpu
    check_memory
    check_disk
}
main

Python脚本 - 应用错误报警

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import json
import time
import logging
import smtplib
import requests
from email.mime.text import MIMEText
from datetime import datetime
class ErrorAlert:
    def __init__(self):
        # 配置
        self.config = {
            'smtp_server': 'smtp.example.com',
            'smtp_port': 587,
            'smtp_user': 'alert@example.com',
            'smtp_password': 'your_password',
            'alert_email': 'admin@example.com',
            'webhook_url': 'https://hooks.slack.com/services/xxx',
            'dingtalk_token': 'your_dingtalk_token'
        }
        # 日志配置
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('/var/log/app_alert.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    def send_email(self, subject, body):
        """发送邮件报警"""
        try:
            msg = MIMEText(body, 'plain', 'utf-8')
            msg['Subject'] = subject
            msg['From'] = self.config['smtp_user']
            msg['To'] = self.config['alert_email']
            with smtplib.SMTP(self.config['smtp_server'], self.config['smtp_port']) as server:
                server.starttls()
                server.login(self.config['smtp_user'], self.config['smtp_password'])
                server.send_message(msg)
            self.logger.info(f"邮件报警发送成功: {subject}")
        except Exception as e:
            self.logger.error(f"邮件报警发送失败: {e}")
    def send_slack(self, message):
        """发送Slack报警"""
        try:
            payload = {
                'text': f"⚠️ *错误报警* \n{message}",
                'channel': '#alerts',
                'username': 'Alert Bot'
            }
            response = requests.post(
                self.config['webhook_url'],
                json=payload
            )
            if response.status_code == 200:
                self.logger.info("Slack报警发送成功")
            else:
                self.logger.error(f"Slack报警发送失败: {response.text}")
        except Exception as e:
            self.logger.error(f"Slack报警发送异常: {e}")
    def send_dingtalk(self, message):
        """发送钉钉报警"""
        try:
            url = f"https://oapi.dingtalk.com/robot/send?access_token={self.config['dingtalk_token']}"
            payload = {
                "msgtype": "text",
                "text": {
                    "content": f"🔴 错误报警提醒\n{message}"
                },
                "at": {
                    "isAtAll": False
                }
            }
            headers = {'Content-Type': 'application/json'}
            response = requests.post(url, json=payload, headers=headers)
            if response.json().get('errcode') == 0:
                self.logger.info("钉钉报警发送成功")
            else:
                self.logger.error(f"钉钉报警发送失败: {response.text}")
        except Exception as e:
            self.logger.error(f"钉钉报警发送异常: {e}")
    def check_application(self, url, timeout=5):
        """检查应用状态"""
        try:
            response = requests.get(url, timeout=timeout)
            if response.status_code != 200:
                self.trigger_alert(
                    f"应用访问异常: {url}",
                    f"状态码: {response.status_code}\n响应时间: {response.elapsed.total_seconds()}s"
                )
                return False
            return True
        except requests.exceptions.Timeout:
            self.trigger_alert("应用超时", f"请求超时: {url}")
            return False
        except requests.exceptions.ConnectionError:
            self.trigger_alert("应用连接失败", f"无法连接: {url}")
            return False
    def check_log_error(self, log_file, keywords):
        """检查日志错误"""
        try:
            with open(log_file, 'r', encoding='utf-8') as f:
                for line in f:
                    for keyword in keywords:
                        if keyword in line:
                            self.trigger_alert(
                                f"日志发现错误关键词: {keyword}",
                                f"文件: {log_file}\n内容: {line.strip()}"
                            )
                            return True
            return False
        except Exception as e:
            self.logger.error(f"日志检查失败: {e}")
            return False
    def trigger_alert(self, title, message):
        """触发报警"""
        timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        full_message = f"时间: {timestamp}\n标题: {title}\n详情: {message}"
        # 发送多渠道报警
        self.send_email(title, full_message)
        self.send_slack(full_message)
        self.send_dingtalk(full_message)
        self.logger.warning(f"报警触发: {title}")
    def run(self, check_type='application', **kwargs):
        """运行监控检查"""
        if check_type == 'application':
            self.check_application(kwargs.get('url'))
        elif check_type == 'log':
            self.check_log_error(
                kwargs.get('log_file'),
                kwargs.get('keywords', ['ERROR', 'FATAL', 'Exception'])
            )
if __name__ == "__main__":
    # 使用示例
    alert = ErrorAlert()
    # 检查应用
    alert.run('application', url='http://localhost:8080/health')
    # 检查日志
    alert.run('log', log_file='/var/log/app.log', keywords=['ERROR', 'FATAL'])

Go脚本 - 高性能监控报警

package main
import (
    "fmt"
    "log"
    "net/http"
    "net/smtp"
    "os"
    "time"
)
type AlertConfig struct {
    SmtpServer  string
    SmtpPort    string
    EmailFrom   string
    EmailTo     string
    EmailPass   string
    WebhookURL  string
}
type Monitor struct {
    config   AlertConfig
    logger   *log.Logger
    services []string
}
func NewMonitor(config AlertConfig) *Monitor {
    return &Monitor{
        config: config,
        logger: log.New(os.Stdout, "[Monitor] ", log.Ldate|log.Ltime),
    }
}
func (m *Monitor) CheckService(url string) bool {
    client := &http.Client{
        Timeout: 5 * time.Second,
    }
    resp, err := client.Get(url)
    if err != nil {
        m.triggerAlert(fmt.Sprintf("服务不可达: %s", url), err.Error())
        return false
    }
    defer resp.Body.Close()
    if resp.StatusCode >= 500 {
        m.triggerAlert(fmt.Sprintf("服务错误: %s", url), 
            fmt.Sprintf("状态码: %d", resp.StatusCode))
        return false
    }
    return true
}
func (m *Monitor) sendEmail(subject, body string) {
    auth := smtp.PlainAuth("", m.config.EmailFrom, m.config.EmailPass, 
        m.config.SmtpServer)
    msg := fmt.Sprintf("To: %s\r\nSubject: %s\r\n\r\n%s", 
        m.config.EmailTo, subject, body)
    err := smtp.SendMail(
        m.config.SmtpServer+":"+m.config.SmtpPort,
        auth,
        m.config.EmailFrom,
        []string{m.config.EmailTo},
        []byte(msg),
    )
    if err != nil {
        m.logger.Printf("邮件发送失败: %v", err)
    } else {
        m.logger.Println("邮件报警发送成功")
    }
}
func (m *Monitor) triggerAlert(subject, body string) {
    m.logger.Printf("报警触发: %s - %s", subject, body)
    // 发送邮件报警
    go m.sendEmail(subject, body)
    // 可选:发送到Webhook
    // m.sendWebhook(subject, body)
}
func (m *Monitor) Start() {
    m.logger.Println("监控服务启动...")
    for {
        for _, service := range m.services {
            go m.CheckService(service)
        }
        time.Sleep(60 * time.Second)
    }
}
func main() {
    config := AlertConfig{
        SmtpServer: "smtp.gmail.com",
        SmtpPort:   "587",
        EmailFrom:  "alert@example.com",
        EmailTo:    "admin@example.com",
        EmailPass:  "your-password",
    }
    monitor := NewMonitor(config)
    monitor.services = []string{
        "http://localhost:8080/health",
        "http://localhost:3000/api/status",
    }
    monitor.Start()
}

使用建议

设置定时任务

# crontab -e
# 每分钟检查一次
* * * * * /path/to/alert_script.sh
# 每5分钟检查
*/5 * * * * /usr/bin/python3 /path/to/alert.py
# 工作时间检查(周一至周五 9:00-18:00)
*/5 9-18 * * 1-5 /path/to/alert.sh

报警级别

# 三级报警示例
def alert_level(message, level='warning'):
    levels = {
        'info': {'icon': 'ℹ️', 'channels': ['log']},
        'warning': {'icon': '⚠️', 'channels': ['log', 'email']},
        'critical': {'icon': '🚨', 'channels': ['log', 'email', 'sms', 'phone']},
    }
    # 根据级别发送不同通道的报警

选择哪种脚本取决于你的具体需求:

  • Shell:简单快速,适合系统级监控
  • Python:功能丰富,适合应用级监控
  • Go:高性能,适合大规模监控系统

抱歉,评论功能暂时关闭!