本文目录导读:

我来详细介绍如何用Python搭建长连接服务,主要基于WebSocket和原始Socket两种方式。
使用WebSocket搭建长连接服务
安装依赖
pip install websockets
服务端代码
import asyncio
import websockets
import json
from datetime import datetime
class WebSocketServer:
def __init__(self, host='localhost', port=8765):
self.host = host
self.port = port
self.clients = set() # 存储所有客户端连接
async def handler(self, websocket, path):
"""处理客户端连接"""
# 添加客户端到集合
self.clients.add(websocket)
client_addr = websocket.remote_address
print(f"[{datetime.now()}] 新客户端连接: {client_addr}")
try:
async for message in websocket:
print(f"收到消息: {message}")
# 处理客户端消息
response = await self.process_message(websocket, message)
# 发送响应
await websocket.send(json.dumps(response))
except websockets.exceptions.ConnectionClosed:
print(f"客户端断开: {client_addr}")
finally:
# 移除断开的客户端
self.clients.remove(websocket)
async def process_message(self, websocket, message):
"""处理消息逻辑"""
try:
data = json.loads(message)
msg_type = data.get('type', '')
if msg_type == 'ping':
return {'type': 'pong', 'timestamp': datetime.now().isoformat()}
elif msg_type == 'echo':
return {'type': 'echo', 'data': data.get('data', '')}
else:
return {'type': 'unknown', 'message': '未知消息类型'}
except json.JSONDecodeError:
return {'type': 'error', 'message': '无效的JSON格式'}
async def broadcast(self, message):
"""广播消息给所有客户端"""
if self.clients:
print(f"广播消息给 {len(self.clients)} 个客户端")
# 创建任务并行发送
tasks = [client.send(json.dumps(message)) for client in self.clients]
await asyncio.gather(*tasks, return_exceptions=True)
async def start(self):
"""启动服务器"""
print(f"WebSocket服务器启动在 ws://{self.host}:{self.port}")
async with websockets.serve(self.handler, self.host, self.port):
await asyncio.Future() # 运行直到被中断
# 运行服务器
async def main():
server = WebSocketServer()
await server.start()
if __name__ == "__main__":
asyncio.run(main())
客户端代码
import asyncio
import websockets
import json
import time
async def client():
uri = "ws://localhost:8765"
async with websockets.connect(uri) as websocket:
print("已连接到服务器")
# 发送心跳
await websocket.send(json.dumps({'type': 'ping'}))
response = await websocket.recv()
print(f"收到: {response}")
# 发送回声消息
await websocket.send(json.dumps({
'type': 'echo',
'data': 'Hello WebSocket!'
}))
response = await websocket.recv()
print(f"收到: {response}")
# 保持连接并定期发送心跳
while True:
await asyncio.sleep(5)
await websocket.send(json.dumps({'type': 'ping'}))
response = await websocket.recv()
print(f"心跳响应: {response}")
if __name__ == "__main__":
asyncio.run(client())
使用Socket搭建TCP长连接服务
服务端代码
import socket
import threading
import json
from datetime import datetime
class TCPServer:
def __init__(self, host='localhost', port=8888):
self.host = host
self.port = port
self.clients = {} # 存储客户端连接: {client_id: (conn, addr)}
self.lock = threading.Lock() # 线程锁
def handle_client(self, conn, addr):
"""处理单个客户端连接"""
client_id = f"{addr[0]}:{addr[1]}"
print(f"[{datetime.now()}] 新连接: {client_id}")
with self.lock:
self.clients[client_id] = (conn, addr)
try:
while True:
# 接收数据
data = conn.recv(1024)
if not data:
break
message = data.decode('utf-8')
print(f"收到来自 {client_id} 的消息: {message}")
# 处理消息
response = self.process_message(client_id, message)
# 发送响应
conn.send(response.encode('utf-8'))
except ConnectionResetError:
print(f"连接重置: {client_id}")
except Exception as e:
print(f"处理异常: {e}")
finally:
print(f"客户端断开: {client_id}")
conn.close()
with self.lock:
if client_id in self.clients:
del self.clients[client_id]
def process_message(self, client_id, message):
"""处理消息逻辑"""
try:
data = json.loads(message)
msg_type = data.get('type', '')
if msg_type == 'ping':
return json.dumps({'type': 'pong', 'timestamp': datetime.now().isoformat()})
elif msg_type == 'echo':
return json.dumps({'type': 'echo', 'data': data.get('data', '')})
else:
return json.dumps({'type': 'unknown', 'message': '未知消息类型'})
except json.JSONDecodeError:
return json.dumps({'type': 'error', 'message': '无效的JSON格式'})
def broadcast(self, message):
"""广播消息给所有客户端"""
with self.lock:
disconnected = []
for client_id, (conn, addr) in self.clients.items():
try:
conn.send(message.encode('utf-8'))
except:
disconnected.append(client_id)
# 清理断开的连接
for client_id in disconnected:
del self.clients[client_id]
def start(self):
"""启动服务器"""
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((self.host, self.port))
server.listen(5)
print(f"TCP服务器启动在 {self.host}:{self.port}")
try:
while True:
conn, addr = server.accept()
# 为每个客户端创建线程
client_thread = threading.Thread(
target=self.handle_client,
args=(conn, addr)
)
client_thread.daemon = True
client_thread.start()
except KeyboardInterrupt:
print("服务器关闭")
finally:
server.close()
if __name__ == "__main__":
server = TCPServer()
server.start()
客户端代码
import socket
import json
import time
class TCPClient:
def __init__(self, host='localhost', port=8888):
self.host = host
self.port = port
self.sock = None
def connect(self):
"""连接到服务器"""
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.sock.connect((self.host, self.port))
print(f"已连接到 {self.host}:{self.port}")
def send_message(self, message):
"""发送消息"""
if self.sock:
self.sock.send(json.dumps(message).encode('utf-8'))
def receive_message(self):
"""接收消息"""
if self.sock:
data = self.sock.recv(1024)
return json.loads(data.decode('utf-8'))
return None
def close(self):
"""关闭连接"""
if self.sock:
self.sock.close()
print("连接已关闭")
def main():
client = TCPClient()
try:
client.connect()
# 发送心跳
client.send_message({'type': 'ping'})
response = client.receive_message()
print(f"收到: {response}")
# 发送回声消息
client.send_message({
'type': 'echo',
'data': 'Hello TCP!'
})
response = client.receive_message()
print(f"收到: {response}")
# 保持连接
while True:
time.sleep(5)
client.send_message({'type': 'ping'})
response = client.receive_message()
print(f"心跳响应: {response}")
except KeyboardInterrupt:
print("客户端关闭")
finally:
client.close()
if __name__ == "__main__":
main()
使用FastAPI + WebSocket
安装依赖
pip install fastapi uvicorn websockets
服务端代码
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
import json
from datetime import datetime
app = FastAPI()
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
print(f"新连接: {websocket.client}")
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
print(f"断开连接: {websocket.client}")
async def send_personal_message(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
print(f"收到来自 {client_id} 的消息: {data}")
# 处理消息
message_data = json.loads(data)
if message_data.get('type') == 'ping':
response = json.dumps({
'type': 'pong',
'timestamp': datetime.now().isoformat()
})
await manager.send_personal_message(response, websocket)
elif message_data.get('type') == 'echo':
response = json.dumps({
'type': 'echo',
'data': message_data.get('data', '')
})
await manager.send_personal_message(response, websocket)
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"客户端 {client_id} 断开连接")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
生产环境建议
异步处理
import asyncio
from concurrent.futures import ThreadPoolExecutor
class AsyncServer:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=10)
async def handle_task(self, data):
"""异步处理任务"""
loop = asyncio.get_event_loop()
# 在线程池中执行耗时操作
result = await loop.run_in_executor(
self.executor,
self.process_data,
data
)
return result
def process_data(self, data):
"""实际的数据处理"""
# 模拟耗时操作
import time
time.sleep(1)
return f"Processed: {data}"
心跳检测
import asyncio
import time
class HeartbeatManager:
def __init__(self, timeout=30):
self.timeout = timeout
self.last_heartbeat = {}
def update_heartbeat(self, client_id):
"""更新心跳时间"""
self.last_heartbeat[client_id] = time.time()
async def check_heartbeats(self):
"""检查心跳超时"""
while True:
current_time = time.time()
expired = []
for client_id, last_time in self.last_heartbeat.items():
if current_time - last_time > self.timeout:
expired.append(client_id)
# 清理超时客户端
for client_id in expired:
del self.last_heartbeat[client_id]
# 触发断开连接逻辑
await self.handle_disconnect(client_id)
await asyncio.sleep(5)
async def handle_disconnect(self, client_id):
"""处理断开的客户端"""
print(f"客户端 {client_id} 心跳超时,断开连接")
完整示例:带异常处理的长连接服务
import asyncio
import websockets
import json
import logging
from typing import Set, Dict
from datetime import datetime
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RobustWebSocketServer:
def __init__(self, host='0.0.0.0', port=8765):
self.host = host
self.port = port
self.clients: Set[websockets.WebSocketServerProtocol] = set()
self.client_info: Dict[str, dict] = {}
async def handler(self, websocket, path=None):
"""处理WebSocket连接的协程"""
client_id = f"{websocket.remote_address[0]}:{websocket.remote_address[1]}"
try:
# 注册客户端
self.clients.add(websocket)
self.client_info[client_id] = {
'connected_at': datetime.now(),
'last_activity': datetime.now()
}
logger.info(f"客户端连接: {client_id}")
# 发送欢迎消息
await websocket.send(json.dumps({
'type': 'welcome',
'message': f'欢迎 {client_id}',
'client_id': client_id
}))
# 处理消息循环
async for message in websocket:
self.client_info[client_id]['last_activity'] = datetime.now()
try:
data = json.loads(message)
response = await self.process_message(client_id, data)
await websocket.send(json.dumps(response))
except json.JSONDecodeError:
await websocket.send(json.dumps({
'type': 'error',
'message': '无效的JSON格式'
}))
except Exception as e:
logger.error(f"处理消息异常: {e}")
await websocket.send(json.dumps({
'type': 'error',
'message': f'服务器内部错误: {str(e)}'
}))
except websockets.exceptions.ConnectionClosed:
logger.info(f"客户端断开连接: {client_id}")
except Exception as e:
logger.error(f"连接异常: {e}")
finally:
# 清理资源
self.clients.discard(websocket)
self.client_info.pop(client_id, None)
async def process_message(self, client_id, data):
"""处理消息的业务逻辑"""
msg_type = data.get('type', '')
if msg_type == 'ping':
return {
'type': 'pong',
'timestamp': datetime.now().isoformat(),
'client_id': client_id
}
elif msg_type == 'echo':
return {
'type': 'echo',
'data': data.get('data', ''),
'timestamp': datetime.now().isoformat()
}
elif msg_type == 'broadcast':
# 广播消息给所有客户端
await self.broadcast({
'type': 'broadcast',
'from': client_id,
'message': data.get('message', ''),
'timestamp': datetime.now().isoformat()
})
return {'type': 'broadcast_ack', 'status': 'sent'}
elif msg_type == 'get_clients':
return {
'type': 'clients_list',
'clients': list(self.client_info.keys()),
'count': len(self.client_info)
}
else:
return {
'type': 'unknown',
'received_type': msg_type,
'message': '未知的消息类型'
}
async def broadcast(self, message):
"""广播消息给所有连接的客户端"""
if self.clients:
tasks = []
for client in self.clients:
tasks.append(client.send(json.dumps(message)))
results = await asyncio.gather(*tasks, return_exceptions=True)
# 处理发送失败的客户端
failed_clients = []
for i, result in enumerate(results):
if isinstance(result, Exception):
failed_clients.append(list(self.clients)[i])
for client in failed_clients:
self.clients.discard(client)
async def health_check(self):
"""定期健康检查"""
while True:
try:
# 发送心跳给所有客户端
ping_message = json.dumps({'type': 'ping', 'server_time': datetime.now().isoformat()})
await self.broadcast(ping_message)
logger.info(f"健康检查: {len(self.clients)} 个客户端在线")
except Exception as e:
logger.error(f"健康检查异常: {e}")
await asyncio.sleep(30) # 每30秒检查一次
async def start(self):
"""启动服务器"""
logger.info(f"WebSocket服务器启动在 ws://{self.host}:{self.port}")
async with websockets.serve(self.handler, self.host, self.port):
# 启动健康检查
health_task = asyncio.create_task(self.health_check())
try:
await asyncio.Future() # 运行直到被中断
except KeyboardInterrupt:
logger.info("收到关闭信号")
finally:
health_task.cancel()
logger.info("服务器关闭")
if __name__ == "__main__":
server = RobustWebSocketServer()
asyncio.run(server.start())
关键要点
-
选择合适的协议
- WebSocket:适合浏览器应用,双向通信
- TCP Socket:适合底层通信,性能更好
- HTTP/2:适合需要流式传输的场景
-
连接管理
- 维护连接池
- 实现心跳机制
- 处理重连逻辑
-
性能优化
- 使用异步IO
- 合理设置超时
- 实现连接复用
-
错误处理
- 完善的异常捕获
- 优雅的断开处理
- 日志记录
选择哪种方式取决于你的具体需求,WebSocket更通用,TCP Socket性能更好,FastAPI集成更方便。