Python案例如何推送自定义消息?

wen python案例 65

本文目录导读:

Python案例如何推送自定义消息?

  1. 使用WebSocket实现实时推送
  2. 使用HTTP请求推送(RESTful API)
  3. 使用消息队列(RabbitMQ)
  4. 使用Redis发布/订阅
  5. 使用第三方推送服务(OneSignal示例)
  6. 创建简单的推送框架
  7. 选择建议

在Python中推送自定义消息有多种方式,具体取决于你的应用场景,以下是几种常见的实现方案:

使用WebSocket实现实时推送

安装依赖

pip install websockets asyncio

服务端代码

import asyncio
import websockets
import json
async def handler(websocket, path):
    try:
        async for message in websocket:
            print(f"收到消息: {message}")
            # 向客户端发送响应
            await websocket.send(json.dumps({
                "status": "success",
                "data": "消息已接收"
            }))
    except websockets.exceptions.ConnectionClosed:
        print("连接关闭")
async def main():
    async with websockets.serve(handler, "localhost", 8765):
        print("WebSocket服务运行在 ws://localhost:8765")
        await asyncio.Future()  # 永远运行
# 推送自定义消息
async def push_message(message):
    async with websockets.connect("ws://localhost:8765") as websocket:
        await websocket.send(json.dumps(message))
        response = await websocket.recv()
        return response
if __name__ == "__main__":
    asyncio.run(main())

使用HTTP请求推送(RESTful API)

服务端(Flask)

from flask import Flask, request, jsonify
import requests
app = Flask(__name__)
# 存储订阅者
subscribers = []
@app.route('/subscribe', methods=['POST'])
def subscribe():
    data = request.json
    subscribers.append(data['url'])
    return jsonify({"status": "success"})
@app.route('/push', methods=['POST'])
def push():
    message = request.json
    failed_subscribers = []
    for url in subscribers:
        try:
            response = requests.post(url, json=message, timeout=5)
        except:
            failed_subscribers.append(url)
    return jsonify({
        "failed_count": len(failed_subscribers),
        "failed_subscribers": failed_subscribers
    })
if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)

客户端

import requests
import json
def push_custom_message():
    message = {
        "title": "系统通知",
        "content": "这是一条自定义推送消息",
        "type": "info",
        "timestamp": "2024-01-01 12:00:00",
        "priority": "high"
    }
    # 推送到WebSocket服务器
    response = requests.post(
        "http://localhost:5000/push",
        json=message
    )
    print(f"推送结果: {response.json()}")
if __name__ == "__main__":
    push_custom_message()

使用消息队列(RabbitMQ)

安装依赖

pip install pika

生产者(推送消息)

import pika
import json
def push_to_rabbitmq(message):
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    # 声明队列
    channel.queue_declare(queue='custom_messages', durable=True)
    # 发送消息
    channel.basic_publish(
        exchange='',
        routing_key='custom_messages',
        body=json.dumps(message),
        properties=pika.BasicProperties(
            delivery_mode=2,  # 持久化消息
        )
    )
    print(f"消息已发送: {message}")
    connection.close()
if __name__ == "__main__":
    message = {
        "event": "system_alert",
        "severity": "warning",
        "message": "系统资源使用率超过90%"
    }
    push_to_rabbitmq(message)

消费者(接收消息)

import pika
import json
def callback(ch, method, properties, body):
    message = json.loads(body)
    print(f"收到消息: {message}")
    # 处理消息...
    ch.basic_ack(delivery_tag=method.delivery_tag)
def start_consumer():
    connection = pika.BlockingConnection(
        pika.ConnectionParameters('localhost')
    )
    channel = connection.channel()
    channel.queue_declare(queue='custom_messages', durable=True)
    channel.basic_qos(prefetch_count=1)
    channel.basic_consume(
        queue='custom_messages',
        on_message_callback=callback
    )
    print('等待消息...')
    channel.start_consuming()
if __name__ == "__main__":
    start_consumer()

使用Redis发布/订阅

安装依赖

pip install redis

发布者

import redis
import json
# 连接Redis
r = redis.Redis(host='localhost', port=6379, db=0)
def push_via_redis(channel, message):
    # 发布消息到指定频道
    r.publish(channel, json.dumps(message))
    print(f"消息已发布到频道 {channel}")
if __name__ == "__main__":
    message = {
        "type": "user_activity",
        "user_id": 12345,
        "action": "login",
        "timestamp": "2024-01-01 12:00:00"
    }
    # 推送到多个频道
    channels = ['system_alerts', 'user_activity']
    for channel in channels:
        push_via_redis(channel, message)

订阅者

import redis
import json
def listen_to_channel(channel):
    r = redis.Redis(host='localhost', port=6379, db=0)
    pubsub = r.pubsub()
    pubsub.subscribe(channel)
    print(f"监听频道: {channel}")
    for message in pubsub.listen():
        if message['type'] == 'message':
            data = json.loads(message['data'])
            print(f"收到消息: {data}")
            # 处理消息...
if __name__ == "__main__":
    listen_to_channel('system_alerts')

使用第三方推送服务(OneSignal示例)

安装依赖

pip install onesignal-sdk
from onesignal_sdk.client import Client
from onesignal_sdk.error import OneSignalError
def push_via_onesignal():
    # 初始化OneSignal客户端
    client = Client(
        app_id="your_app_id",
        rest_api_key="your_rest_api_key"
    )
    # 创建通知内容
    notification_body = {
        "contents": {
            "en": "这是一条自定义推送消息",
            "zh": "这是一条自定义推送消息"
        },
        "headings": {
            "en": "系统通知",
            "zh": "系统通知"
        },
        "include_player_ids": ["player_id_1", "player_id_2"],  # 指定用户
        "data": {
            "type": "custom_notification",
            "action": "open_url",
            "url": "https://example.com"
        },
        "ios_badgeType": "Increase",
        "ios_badgeCount": 1
    }
    try:
        response = client.send_notification(notification_body)
        print(f"推送成功: {response.body}")
    except OneSignalError as e:
        print(f"推送失败: {e}")
if __name__ == "__main__":
    push_via_onesignal()

创建简单的推送框架

import json
import threading
import queue
from typing import Callable, Dict, Any
from datetime import datetime
class CustomPushFramework:
    def __init__(self):
        self.channels = {}  # 存储通道和订阅者
        self.message_queue = queue.Queue()
        self.running = False
    def subscribe(self, channel: str, callback: Callable):
        """订阅消息通道"""
        if channel not in self.channels:
            self.channels[channel] = []
        self.channels[channel].append(callback)
        print(f"已订阅通道: {channel}")
    def unsubscribe(self, channel: str, callback: Callable):
        """取消订阅"""
        if channel in self.channels:
            self.channels[channel].remove(callback)
    def publish(self, channel: str, message: Any):
        """发布消息到通道"""
        message_with_meta = {
            "channel": channel,
            "data": message,
            "timestamp": datetime.now().isoformat(),
            "message_id": id(message)
        }
        self.message_queue.put(message_with_meta)
        print(f"消息已入队: {message_with_meta}")
    def process_messages(self):
        """处理消息队列中的消息"""
        while self.running:
            try:
                message = self.message_queue.get(timeout=1)
                channel = message["channel"]
                if channel in self.channels:
                    for callback in self.channels[channel]:
                        try:
                            callback(message)
                        except Exception as e:
                            print(f"消息处理错误: {e}")
            except queue.Empty:
                continue
    def start(self):
        """启动推送框架"""
        self.running = True
        worker = threading.Thread(target=self.process_messages)
        worker.daemon = True
        worker.start()
        print("推送框架已启动")
    def stop(self):
        """停止推送框架"""
        self.running = False
        print("推送框架已停止")
# 使用示例
def main():
    # 创建推送框架实例
    push_framework = CustomPushFramework()
    push_framework.start()
    # 定义消息处理器
    def email_handler(message):
        print(f"[邮件通知] {message['data']}")
    def sms_handler(message):
        print(f"[短信通知] {message['data']}")
    def webhook_handler(message):
        print(f"[Webhook通知] {message['data']}")
    # 订阅通道
    push_framework.subscribe("notifications", email_handler)
    push_framework.subscribe("alerts", sms_handler)
    push_framework.subscribe("webhooks", webhook_handler)
    # 推送自定义消息
    push_framework.publish("notifications", {
        "title": "新消息通知",
        "content": "您有一条新的系统消息"
    })
    push_framework.publish("alerts", {
        "level": "warning",
        "message": "服务器CPU使用率过高"
    })
    push_framework.publish("webhooks", {
        "event": "user.created",
        "user_id": 12345
    })
    # 等待处理完成
    import time
    time.sleep(2)
    push_framework.stop()
if __name__ == "__main__":
    main()

选择建议

  • WebSocket:适合需要实时双向通信的场景
  • HTTP API:适合简单的请求-响应模式
  • 消息队列:适合高并发、需要消息持久化的场景
  • Redis Pub/Sub:适合轻量级的发布订阅模式
  • 第三方服务:适合需要跨平台推送的场景

根据你的具体需求选择合适的方案,如果需要更多针对特定场景的示例,请告诉我!

抱歉,评论功能暂时关闭!