Python案例怎么搭建长连接服务?

wen python案例 71

本文目录导读:

Python案例怎么搭建长连接服务?

  1. 使用WebSocket搭建长连接服务
  2. 使用Socket搭建TCP长连接服务
  3. 使用FastAPI + WebSocket
  4. 生产环境建议
  5. 关键要点

我来详细介绍如何用Python搭建长连接服务,主要基于WebSocket和原始Socket两种方式。

使用WebSocket搭建长连接服务

安装依赖

pip install websockets

服务端代码

import asyncio
import websockets
import json
from datetime import datetime
class WebSocketServer:
    def __init__(self, host='localhost', port=8765):
        self.host = host
        self.port = port
        self.clients = set()  # 存储所有客户端连接
    async def handler(self, websocket, path):
        """处理客户端连接"""
        # 添加客户端到集合
        self.clients.add(websocket)
        client_addr = websocket.remote_address
        print(f"[{datetime.now()}] 新客户端连接: {client_addr}")
        try:
            async for message in websocket:
                print(f"收到消息: {message}")
                # 处理客户端消息
                response = await self.process_message(websocket, message)
                # 发送响应
                await websocket.send(json.dumps(response))
        except websockets.exceptions.ConnectionClosed:
            print(f"客户端断开: {client_addr}")
        finally:
            # 移除断开的客户端
            self.clients.remove(websocket)
    async def process_message(self, websocket, message):
        """处理消息逻辑"""
        try:
            data = json.loads(message)
            msg_type = data.get('type', '')
            if msg_type == 'ping':
                return {'type': 'pong', 'timestamp': datetime.now().isoformat()}
            elif msg_type == 'echo':
                return {'type': 'echo', 'data': data.get('data', '')}
            else:
                return {'type': 'unknown', 'message': '未知消息类型'}
        except json.JSONDecodeError:
            return {'type': 'error', 'message': '无效的JSON格式'}
    async def broadcast(self, message):
        """广播消息给所有客户端"""
        if self.clients:
            print(f"广播消息给 {len(self.clients)} 个客户端")
            # 创建任务并行发送
            tasks = [client.send(json.dumps(message)) for client in self.clients]
            await asyncio.gather(*tasks, return_exceptions=True)
    async def start(self):
        """启动服务器"""
        print(f"WebSocket服务器启动在 ws://{self.host}:{self.port}")
        async with websockets.serve(self.handler, self.host, self.port):
            await asyncio.Future()  # 运行直到被中断
# 运行服务器
async def main():
    server = WebSocketServer()
    await server.start()
if __name__ == "__main__":
    asyncio.run(main())

客户端代码

import asyncio
import websockets
import json
import time
async def client():
    uri = "ws://localhost:8765"
    async with websockets.connect(uri) as websocket:
        print("已连接到服务器")
        # 发送心跳
        await websocket.send(json.dumps({'type': 'ping'}))
        response = await websocket.recv()
        print(f"收到: {response}")
        # 发送回声消息
        await websocket.send(json.dumps({
            'type': 'echo', 
            'data': 'Hello WebSocket!'
        }))
        response = await websocket.recv()
        print(f"收到: {response}")
        # 保持连接并定期发送心跳
        while True:
            await asyncio.sleep(5)
            await websocket.send(json.dumps({'type': 'ping'}))
            response = await websocket.recv()
            print(f"心跳响应: {response}")
if __name__ == "__main__":
    asyncio.run(client())

使用Socket搭建TCP长连接服务

服务端代码

import socket
import threading
import json
from datetime import datetime
class TCPServer:
    def __init__(self, host='localhost', port=8888):
        self.host = host
        self.port = port
        self.clients = {}  # 存储客户端连接: {client_id: (conn, addr)}
        self.lock = threading.Lock()  # 线程锁
    def handle_client(self, conn, addr):
        """处理单个客户端连接"""
        client_id = f"{addr[0]}:{addr[1]}"
        print(f"[{datetime.now()}] 新连接: {client_id}")
        with self.lock:
            self.clients[client_id] = (conn, addr)
        try:
            while True:
                # 接收数据
                data = conn.recv(1024)
                if not data:
                    break
                message = data.decode('utf-8')
                print(f"收到来自 {client_id} 的消息: {message}")
                # 处理消息
                response = self.process_message(client_id, message)
                # 发送响应
                conn.send(response.encode('utf-8'))
        except ConnectionResetError:
            print(f"连接重置: {client_id}")
        except Exception as e:
            print(f"处理异常: {e}")
        finally:
            print(f"客户端断开: {client_id}")
            conn.close()
            with self.lock:
                if client_id in self.clients:
                    del self.clients[client_id]
    def process_message(self, client_id, message):
        """处理消息逻辑"""
        try:
            data = json.loads(message)
            msg_type = data.get('type', '')
            if msg_type == 'ping':
                return json.dumps({'type': 'pong', 'timestamp': datetime.now().isoformat()})
            elif msg_type == 'echo':
                return json.dumps({'type': 'echo', 'data': data.get('data', '')})
            else:
                return json.dumps({'type': 'unknown', 'message': '未知消息类型'})
        except json.JSONDecodeError:
            return json.dumps({'type': 'error', 'message': '无效的JSON格式'})
    def broadcast(self, message):
        """广播消息给所有客户端"""
        with self.lock:
            disconnected = []
            for client_id, (conn, addr) in self.clients.items():
                try:
                    conn.send(message.encode('utf-8'))
                except:
                    disconnected.append(client_id)
            # 清理断开的连接
            for client_id in disconnected:
                del self.clients[client_id]
    def start(self):
        """启动服务器"""
        server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        server.bind((self.host, self.port))
        server.listen(5)
        print(f"TCP服务器启动在 {self.host}:{self.port}")
        try:
            while True:
                conn, addr = server.accept()
                # 为每个客户端创建线程
                client_thread = threading.Thread(
                    target=self.handle_client, 
                    args=(conn, addr)
                )
                client_thread.daemon = True
                client_thread.start()
        except KeyboardInterrupt:
            print("服务器关闭")
        finally:
            server.close()
if __name__ == "__main__":
    server = TCPServer()
    server.start()

客户端代码

import socket
import json
import time
class TCPClient:
    def __init__(self, host='localhost', port=8888):
        self.host = host
        self.port = port
        self.sock = None
    def connect(self):
        """连接到服务器"""
        self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        self.sock.connect((self.host, self.port))
        print(f"已连接到 {self.host}:{self.port}")
    def send_message(self, message):
        """发送消息"""
        if self.sock:
            self.sock.send(json.dumps(message).encode('utf-8'))
    def receive_message(self):
        """接收消息"""
        if self.sock:
            data = self.sock.recv(1024)
            return json.loads(data.decode('utf-8'))
        return None
    def close(self):
        """关闭连接"""
        if self.sock:
            self.sock.close()
            print("连接已关闭")
def main():
    client = TCPClient()
    try:
        client.connect()
        # 发送心跳
        client.send_message({'type': 'ping'})
        response = client.receive_message()
        print(f"收到: {response}")
        # 发送回声消息
        client.send_message({
            'type': 'echo', 
            'data': 'Hello TCP!'
        })
        response = client.receive_message()
        print(f"收到: {response}")
        # 保持连接
        while True:
            time.sleep(5)
            client.send_message({'type': 'ping'})
            response = client.receive_message()
            print(f"心跳响应: {response}")
    except KeyboardInterrupt:
        print("客户端关闭")
    finally:
        client.close()
if __name__ == "__main__":
    main()

使用FastAPI + WebSocket

安装依赖

pip install fastapi uvicorn websockets

服务端代码

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json
from datetime import datetime
app = FastAPI()
class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []
    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)
        print(f"新连接: {websocket.client}")
    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)
        print(f"断开连接: {websocket.client}")
    async def send_personal_message(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)
    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            print(f"收到来自 {client_id} 的消息: {data}")
            # 处理消息
            message_data = json.loads(data)
            if message_data.get('type') == 'ping':
                response = json.dumps({
                    'type': 'pong',
                    'timestamp': datetime.now().isoformat()
                })
                await manager.send_personal_message(response, websocket)
            elif message_data.get('type') == 'echo':
                response = json.dumps({
                    'type': 'echo',
                    'data': message_data.get('data', '')
                })
                await manager.send_personal_message(response, websocket)
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"客户端 {client_id} 断开连接")
if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

生产环境建议

异步处理

import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncServer:
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=10)
    async def handle_task(self, data):
        """异步处理任务"""
        loop = asyncio.get_event_loop()
        # 在线程池中执行耗时操作
        result = await loop.run_in_executor(
            self.executor, 
            self.process_data, 
            data
        )
        return result
    def process_data(self, data):
        """实际的数据处理"""
        # 模拟耗时操作
        import time
        time.sleep(1)
        return f"Processed: {data}"

心跳检测

import asyncio
import time
class HeartbeatManager:
    def __init__(self, timeout=30):
        self.timeout = timeout
        self.last_heartbeat = {}
    def update_heartbeat(self, client_id):
        """更新心跳时间"""
        self.last_heartbeat[client_id] = time.time()
    async def check_heartbeats(self):
        """检查心跳超时"""
        while True:
            current_time = time.time()
            expired = []
            for client_id, last_time in self.last_heartbeat.items():
                if current_time - last_time > self.timeout:
                    expired.append(client_id)
            # 清理超时客户端
            for client_id in expired:
                del self.last_heartbeat[client_id]
                # 触发断开连接逻辑
                await self.handle_disconnect(client_id)
            await asyncio.sleep(5)
    async def handle_disconnect(self, client_id):
        """处理断开的客户端"""
        print(f"客户端 {client_id} 心跳超时,断开连接")

完整示例:带异常处理的长连接服务

import asyncio
import websockets
import json
import logging
from typing import Set, Dict
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RobustWebSocketServer:
    def __init__(self, host='0.0.0.0', port=8765):
        self.host = host
        self.port = port
        self.clients: Set[websockets.WebSocketServerProtocol] = set()
        self.client_info: Dict[str, dict] = {}
    async def handler(self, websocket, path=None):
        """处理WebSocket连接的协程"""
        client_id = f"{websocket.remote_address[0]}:{websocket.remote_address[1]}"
        try:
            # 注册客户端
            self.clients.add(websocket)
            self.client_info[client_id] = {
                'connected_at': datetime.now(),
                'last_activity': datetime.now()
            }
            logger.info(f"客户端连接: {client_id}")
            # 发送欢迎消息
            await websocket.send(json.dumps({
                'type': 'welcome',
                'message': f'欢迎 {client_id}',
                'client_id': client_id
            }))
            # 处理消息循环
            async for message in websocket:
                self.client_info[client_id]['last_activity'] = datetime.now()
                try:
                    data = json.loads(message)
                    response = await self.process_message(client_id, data)
                    await websocket.send(json.dumps(response))
                except json.JSONDecodeError:
                    await websocket.send(json.dumps({
                        'type': 'error',
                        'message': '无效的JSON格式'
                    }))
                except Exception as e:
                    logger.error(f"处理消息异常: {e}")
                    await websocket.send(json.dumps({
                        'type': 'error',
                        'message': f'服务器内部错误: {str(e)}'
                    }))
        except websockets.exceptions.ConnectionClosed:
            logger.info(f"客户端断开连接: {client_id}")
        except Exception as e:
            logger.error(f"连接异常: {e}")
        finally:
            # 清理资源
            self.clients.discard(websocket)
            self.client_info.pop(client_id, None)
    async def process_message(self, client_id, data):
        """处理消息的业务逻辑"""
        msg_type = data.get('type', '')
        if msg_type == 'ping':
            return {
                'type': 'pong',
                'timestamp': datetime.now().isoformat(),
                'client_id': client_id
            }
        elif msg_type == 'echo':
            return {
                'type': 'echo',
                'data': data.get('data', ''),
                'timestamp': datetime.now().isoformat()
            }
        elif msg_type == 'broadcast':
            # 广播消息给所有客户端
            await self.broadcast({
                'type': 'broadcast',
                'from': client_id,
                'message': data.get('message', ''),
                'timestamp': datetime.now().isoformat()
            })
            return {'type': 'broadcast_ack', 'status': 'sent'}
        elif msg_type == 'get_clients':
            return {
                'type': 'clients_list',
                'clients': list(self.client_info.keys()),
                'count': len(self.client_info)
            }
        else:
            return {
                'type': 'unknown',
                'received_type': msg_type,
                'message': '未知的消息类型'
            }
    async def broadcast(self, message):
        """广播消息给所有连接的客户端"""
        if self.clients:
            tasks = []
            for client in self.clients:
                tasks.append(client.send(json.dumps(message)))
            results = await asyncio.gather(*tasks, return_exceptions=True)
            # 处理发送失败的客户端
            failed_clients = []
            for i, result in enumerate(results):
                if isinstance(result, Exception):
                    failed_clients.append(list(self.clients)[i])
            for client in failed_clients:
                self.clients.discard(client)
    async def health_check(self):
        """定期健康检查"""
        while True:
            try:
                # 发送心跳给所有客户端
                ping_message = json.dumps({'type': 'ping', 'server_time': datetime.now().isoformat()})
                await self.broadcast(ping_message)
                logger.info(f"健康检查: {len(self.clients)} 个客户端在线")
            except Exception as e:
                logger.error(f"健康检查异常: {e}")
            await asyncio.sleep(30)  # 每30秒检查一次
    async def start(self):
        """启动服务器"""
        logger.info(f"WebSocket服务器启动在 ws://{self.host}:{self.port}")
        async with websockets.serve(self.handler, self.host, self.port):
            # 启动健康检查
            health_task = asyncio.create_task(self.health_check())
            try:
                await asyncio.Future()  # 运行直到被中断
            except KeyboardInterrupt:
                logger.info("收到关闭信号")
            finally:
                health_task.cancel()
                logger.info("服务器关闭")
if __name__ == "__main__":
    server = RobustWebSocketServer()
    asyncio.run(server.start())

关键要点

  1. 选择合适的协议

    • WebSocket:适合浏览器应用,双向通信
    • TCP Socket:适合底层通信,性能更好
    • HTTP/2:适合需要流式传输的场景
  2. 连接管理

    • 维护连接池
    • 实现心跳机制
    • 处理重连逻辑
  3. 性能优化

    • 使用异步IO
    • 合理设置超时
    • 实现连接复用
  4. 错误处理

    • 完善的异常捕获
    • 优雅的断开处理
    • 日志记录

选择哪种方式取决于你的具体需求,WebSocket更通用,TCP Socket性能更好,FastAPI集成更方便。

抱歉,评论功能暂时关闭!