WebSocket实时通信全攻略:从原理到实践
WebSocket基础
WebSocket是一种在单个TCP连接上进行全双工通信的协议。与HTTP请求-响应模式不同,WebSocket允许服务器主动向客户端推送数据,实现真正的实时通信。
WebSocket vs HTTP
| 特性 | HTTP | WebSocket |
|---|---|---|
| 通信模式 | 请求-响应 | 全双工 |
| 连接 | 短连接 | 长连接 |
| 开销 | 每次请求带Header | 握手后开销小 |
| 实时性 | 需要轮询 | 即时推送 |
握手过程
# 客户端请求 GET /chat HTTP/1.1 Host: example.com Upgrade: websocket Connection: Upgrade Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ== Sec-WebSocket-Version: 13 # 服务器响应 HTTP/1.1 101 Switching Protocols Upgrade: websocket Connection: Upgrade Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
服务端实现
Node.js实现
const WebSocket = require('ws');
const http = require('http');
// 创建HTTP服务器
const server = http.createServer();
const wss = new WebSocket.Server({ server });
// 连接管理
const clients = new Map();
wss.on('connection', (ws, req) => {
const userId = generateUserId();
clients.set(userId, ws);
console.log(`User ${userId} connected. Total: ${clients.size}`);
// 发送欢迎消息
ws.send(JSON.stringify({
type: 'welcome',
userId,
message: 'Connected successfully'
}));
// 接收消息
ws.on('message', (data) => {
try {
const message = JSON.parse(data);
handleMessage(ws, userId, message);
} catch (e) {
ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));
}
});
// 心跳检测
ws.isAlive = true;
ws.on('pong', () => { ws.isAlive = true; });
// 断开连接
ws.on('close', () => {
clients.delete(userId);
broadcast({ type: 'user_left', userId });
console.log(`User ${userId} disconnected`);
});
});
// 心跳检测
setInterval(() => {
wss.clients.forEach((ws) => {
if (!ws.isAlive) return ws.terminate();
ws.isAlive = false;
ws.ping();
});
}, 30000);
// 广播消息
function broadcast(message) {
const data = JSON.stringify(message);
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
}
server.listen(8080, () => {
console.log('WebSocket server running on port 8080');
});
Python实现
import asyncio
import websockets
import json
from datetime import datetime
connected_clients = set()
async def handler(websocket, path):
# 注册客户端
connected_clients.add(websocket)
client_id = id(websocket)
try:
# 发送欢迎消息
await websocket.send(json.dumps({
'type': 'welcome',
'clientId': client_id,
'timestamp': datetime.now().isoformat()
}))
# 广播新用户加入
await broadcast({
'type': 'user_joined',
'clientId': client_id,
'count': len(connected_clients)
})
# 处理消息
async for message in websocket:
try:
data = json.loads(message)
await handle_message(websocket, data)
except json.JSONDecodeError:
await websocket.send(json.dumps({
'type': 'error',
'message': 'Invalid JSON'
}))
except websockets.exceptions.ConnectionClosed:
pass
finally:
# 移除客户端
connected_clients.remove(websocket)
await broadcast({
'type': 'user_left',
'clientId': client_id,
'count': len(connected_clients)
})
async def broadcast(message):
if connected_clients:
await asyncio.gather(*[
client.send(json.dumps(message))
for client in connected_clients
])
async def handle_message(websocket, data):
msg_type = data.get('type')
if msg_type == 'chat':
await broadcast({
'type': 'chat',
'clientId': id(websocket),
'content': data.get('content'),
'timestamp': datetime.now().isoformat()
})
elif msg_type == 'ping':
await websocket.send(json.dumps({'type': 'pong'}))
# 启动服务器
start_server = websockets.serve(handler, 'localhost', 8080)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()
客户端实现
浏览器原生API
class WebSocketClient {
constructor(url) {
this.url = url;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
this.listeners = new Map();
}
connect() {
this.ws = new WebSocket(this.url);
this.ws.onopen = () => {
console.log('Connected');
this.reconnectAttempts = 0;
this.emit('connected');
};
this.ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data);
this.emit('message', data);
} catch (e) {
console.error('Parse error:', e);
}
};
this.ws.onerror = (error) => {
console.error('WebSocket error:', error);
this.emit('error', error);
};
this.ws.onclose = () => {
console.log('Disconnected');
this.emit('disconnected');
this.attemptReconnect();
};
}
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
}
}
attemptReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`Reconnecting in ${delay}ms...`);
setTimeout(() => this.connect(), delay);
}
}
on(event, callback) {
if (!this.listeners.has(event)) {
this.listeners.set(event, []);
}
this.listeners.get(event).push(callback);
}
emit(event, data) {
if (this.listeners.has(event)) {
this.listeners.get(event).forEach(cb => cb(data));
}
}
}
// 使用示例
const client = new WebSocketClient('ws://localhost:8080');
client.on('message', (data) => {
console.log('Received:', data);
});
client.connect();
实战:实时聊天应用
服务端
const rooms = new Map();
wss.on('connection', (ws) => {
let currentRoom = null;
let username = null;
ws.on('message', (data) => {
const msg = JSON.parse(data);
switch (msg.type) {
case 'join':
username = msg.username;
currentRoom = msg.room;
if (!rooms.has(currentRoom)) {
rooms.set(currentRoom, new Set());
}
rooms.get(currentRoom).add(ws);
broadcastToRoom(currentRoom, {
type: 'system',
message: `${username} joined the room`
});
break;
case 'chat':
if (currentRoom) {
broadcastToRoom(currentRoom, {
type: 'chat',
username,
content: msg.content,
timestamp: Date.now()
});
}
break;
case 'typing':
if (currentRoom) {
broadcastToRoom(currentRoom, {
type: 'typing',
username
}, ws);
}
break;
}
});
ws.on('close', () => {
if (currentRoom && rooms.has(currentRoom)) {
rooms.get(currentRoom).delete(ws);
broadcastToRoom(currentRoom, {
type: 'system',
message: `${username} left the room`
});
}
});
});
function broadcastToRoom(roomId, message, excludeWs = null) {
const room = rooms.get(roomId);
if (!room) return;
const data = JSON.stringify(message);
room.forEach((client) => {
if (client !== excludeWs && client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
}
最佳实践
- 心跳检测:定期ping/pong检测连接状态
- 断线重连:实现指数退避重连策略
- 消息确认:重要消息需要ACK机制
- 安全性:使用wss协议,验证token
- 限流:防止消息洪水攻击
- 监控:记录连接数和消息量
WebSocket是实现实时应用的利器,合理使用能够构建流畅的实时交互体验。
本文链接:https://www.kkkliao.cn/?id=751 转载需授权!
版权声明:本文由廖万里的博客发布,如需转载请注明出处。



手机流量卡
免费领卡
号卡合伙人
产品服务
关于本站
