本文目录导读:

我来帮你创建几种常见的错误报警脚本实现方案。
Shell脚本 - 系统监控报警
#!/bin/bash
# 错误报警脚本 - 监控系统资源
# 配置
ALERT_EMAIL="admin@example.com"
CPU_THRESHOLD=90
MEMORY_THRESHOLD=90
DISK_THRESHOLD=90
# 日志函数
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $1" >> /var/log/alert.log
}
# 发送报警
send_alert() {
local subject="$1"
local body="$2"
# 发送邮件
echo "$body" | mail -s "$subject" "$ALERT_EMAIL"
# 或者发送到钉钉/企业微信(示例)
# curl -X POST "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN" \
# -H "Content-Type: application/json" \
# -d "{\"msgtype\":\"text\",\"text\":{\"content\":\"$body\"}}"
log "Alert sent: $subject"
}
# CPU监控
check_cpu() {
local cpu_usage=$(top -bn1 | grep "Cpu(s)" | awk '{print $2}' | cut -d'%' -f1)
if (( $(echo "$cpu_usage > $CPU_THRESHOLD" | bc -l) )); then
send_alert "CPU超限报警" "CPU使用率: ${cpu_usage}% (阈值: ${CPU_THRESHOLD}%)"
fi
}
# 内存监控
check_memory() {
local mem_usage=$(free | grep Mem | awk '{print $3/$2 * 100.0}')
if (( $(echo "$mem_usage > $MEMORY_THRESHOLD" | bc -l) )); then
send_alert "内存超限报警" "内存使用率: ${mem_usage}% (阈值: ${MEMORY_THRESHOLD}%)"
fi
}
# 磁盘监控
check_disk() {
local disk_usage=$(df -h / | awk 'NR==2 {print $5}' | cut -d'%' -f1)
if [ "$disk_usage" -gt "$DISK_THRESHOLD" ]; then
send_alert "磁盘空间报警" "根分区使用率: ${disk_usage}% (阈值: ${DISK_THRESHOLD}%)"
fi
}
# 主函数
main() {
check_cpu
check_memory
check_disk
}
main
Python脚本 - 应用错误报警
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import sys
import json
import time
import logging
import smtplib
import requests
from email.mime.text import MIMEText
from datetime import datetime
class ErrorAlert:
def __init__(self):
# 配置
self.config = {
'smtp_server': 'smtp.example.com',
'smtp_port': 587,
'smtp_user': 'alert@example.com',
'smtp_password': 'your_password',
'alert_email': 'admin@example.com',
'webhook_url': 'https://hooks.slack.com/services/xxx',
'dingtalk_token': 'your_dingtalk_token'
}
# 日志配置
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('/var/log/app_alert.log'),
logging.StreamHandler()
]
)
self.logger = logging.getLogger(__name__)
def send_email(self, subject, body):
"""发送邮件报警"""
try:
msg = MIMEText(body, 'plain', 'utf-8')
msg['Subject'] = subject
msg['From'] = self.config['smtp_user']
msg['To'] = self.config['alert_email']
with smtplib.SMTP(self.config['smtp_server'], self.config['smtp_port']) as server:
server.starttls()
server.login(self.config['smtp_user'], self.config['smtp_password'])
server.send_message(msg)
self.logger.info(f"邮件报警发送成功: {subject}")
except Exception as e:
self.logger.error(f"邮件报警发送失败: {e}")
def send_slack(self, message):
"""发送Slack报警"""
try:
payload = {
'text': f"⚠️ *错误报警* \n{message}",
'channel': '#alerts',
'username': 'Alert Bot'
}
response = requests.post(
self.config['webhook_url'],
json=payload
)
if response.status_code == 200:
self.logger.info("Slack报警发送成功")
else:
self.logger.error(f"Slack报警发送失败: {response.text}")
except Exception as e:
self.logger.error(f"Slack报警发送异常: {e}")
def send_dingtalk(self, message):
"""发送钉钉报警"""
try:
url = f"https://oapi.dingtalk.com/robot/send?access_token={self.config['dingtalk_token']}"
payload = {
"msgtype": "text",
"text": {
"content": f"🔴 错误报警提醒\n{message}"
},
"at": {
"isAtAll": False
}
}
headers = {'Content-Type': 'application/json'}
response = requests.post(url, json=payload, headers=headers)
if response.json().get('errcode') == 0:
self.logger.info("钉钉报警发送成功")
else:
self.logger.error(f"钉钉报警发送失败: {response.text}")
except Exception as e:
self.logger.error(f"钉钉报警发送异常: {e}")
def check_application(self, url, timeout=5):
"""检查应用状态"""
try:
response = requests.get(url, timeout=timeout)
if response.status_code != 200:
self.trigger_alert(
f"应用访问异常: {url}",
f"状态码: {response.status_code}\n响应时间: {response.elapsed.total_seconds()}s"
)
return False
return True
except requests.exceptions.Timeout:
self.trigger_alert("应用超时", f"请求超时: {url}")
return False
except requests.exceptions.ConnectionError:
self.trigger_alert("应用连接失败", f"无法连接: {url}")
return False
def check_log_error(self, log_file, keywords):
"""检查日志错误"""
try:
with open(log_file, 'r', encoding='utf-8') as f:
for line in f:
for keyword in keywords:
if keyword in line:
self.trigger_alert(
f"日志发现错误关键词: {keyword}",
f"文件: {log_file}\n内容: {line.strip()}"
)
return True
return False
except Exception as e:
self.logger.error(f"日志检查失败: {e}")
return False
def trigger_alert(self, title, message):
"""触发报警"""
timestamp = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
full_message = f"时间: {timestamp}\n标题: {title}\n详情: {message}"
# 发送多渠道报警
self.send_email(title, full_message)
self.send_slack(full_message)
self.send_dingtalk(full_message)
self.logger.warning(f"报警触发: {title}")
def run(self, check_type='application', **kwargs):
"""运行监控检查"""
if check_type == 'application':
self.check_application(kwargs.get('url'))
elif check_type == 'log':
self.check_log_error(
kwargs.get('log_file'),
kwargs.get('keywords', ['ERROR', 'FATAL', 'Exception'])
)
if __name__ == "__main__":
# 使用示例
alert = ErrorAlert()
# 检查应用
alert.run('application', url='http://localhost:8080/health')
# 检查日志
alert.run('log', log_file='/var/log/app.log', keywords=['ERROR', 'FATAL'])
Go脚本 - 高性能监控报警
package main
import (
"fmt"
"log"
"net/http"
"net/smtp"
"os"
"time"
)
type AlertConfig struct {
SmtpServer string
SmtpPort string
EmailFrom string
EmailTo string
EmailPass string
WebhookURL string
}
type Monitor struct {
config AlertConfig
logger *log.Logger
services []string
}
func NewMonitor(config AlertConfig) *Monitor {
return &Monitor{
config: config,
logger: log.New(os.Stdout, "[Monitor] ", log.Ldate|log.Ltime),
}
}
func (m *Monitor) CheckService(url string) bool {
client := &http.Client{
Timeout: 5 * time.Second,
}
resp, err := client.Get(url)
if err != nil {
m.triggerAlert(fmt.Sprintf("服务不可达: %s", url), err.Error())
return false
}
defer resp.Body.Close()
if resp.StatusCode >= 500 {
m.triggerAlert(fmt.Sprintf("服务错误: %s", url),
fmt.Sprintf("状态码: %d", resp.StatusCode))
return false
}
return true
}
func (m *Monitor) sendEmail(subject, body string) {
auth := smtp.PlainAuth("", m.config.EmailFrom, m.config.EmailPass,
m.config.SmtpServer)
msg := fmt.Sprintf("To: %s\r\nSubject: %s\r\n\r\n%s",
m.config.EmailTo, subject, body)
err := smtp.SendMail(
m.config.SmtpServer+":"+m.config.SmtpPort,
auth,
m.config.EmailFrom,
[]string{m.config.EmailTo},
[]byte(msg),
)
if err != nil {
m.logger.Printf("邮件发送失败: %v", err)
} else {
m.logger.Println("邮件报警发送成功")
}
}
func (m *Monitor) triggerAlert(subject, body string) {
m.logger.Printf("报警触发: %s - %s", subject, body)
// 发送邮件报警
go m.sendEmail(subject, body)
// 可选:发送到Webhook
// m.sendWebhook(subject, body)
}
func (m *Monitor) Start() {
m.logger.Println("监控服务启动...")
for {
for _, service := range m.services {
go m.CheckService(service)
}
time.Sleep(60 * time.Second)
}
}
func main() {
config := AlertConfig{
SmtpServer: "smtp.gmail.com",
SmtpPort: "587",
EmailFrom: "alert@example.com",
EmailTo: "admin@example.com",
EmailPass: "your-password",
}
monitor := NewMonitor(config)
monitor.services = []string{
"http://localhost:8080/health",
"http://localhost:3000/api/status",
}
monitor.Start()
}
使用建议
设置定时任务
# crontab -e # 每分钟检查一次 * * * * * /path/to/alert_script.sh # 每5分钟检查 */5 * * * * /usr/bin/python3 /path/to/alert.py # 工作时间检查(周一至周五 9:00-18:00) */5 9-18 * * 1-5 /path/to/alert.sh
报警级别
# 三级报警示例
def alert_level(message, level='warning'):
levels = {
'info': {'icon': 'ℹ️', 'channels': ['log']},
'warning': {'icon': '⚠️', 'channels': ['log', 'email']},
'critical': {'icon': '🚨', 'channels': ['log', 'email', 'sms', 'phone']},
}
# 根据级别发送不同通道的报警
选择哪种脚本取决于你的具体需求:
- Shell:简单快速,适合系统级监控
- Python:功能丰富,适合应用级监控
- Go:高性能,适合大规模监控系统