当前位置:首页 > 未命名 > 正文内容

WebSocket实时通信全攻略:从原理到实践

廖万里9小时前未命名1

WebSocket基础

WebSocket是一种在单个TCP连接上进行全双工通信的协议。与HTTP请求-响应模式不同,WebSocket允许服务器主动向客户端推送数据,实现真正的实时通信。

WebSocket vs HTTP

特性HTTPWebSocket
通信模式请求-响应全双工
连接短连接长连接
开销每次请求带Header握手后开销小
实时性需要轮询即时推送

握手过程

# 客户端请求
GET /chat HTTP/1.1
Host: example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

# 服务器响应
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

服务端实现

Node.js实现

const WebSocket = require('ws');
const http = require('http');

// 创建HTTP服务器
const server = http.createServer();
const wss = new WebSocket.Server({ server });

// 连接管理
const clients = new Map();

wss.on('connection', (ws, req) => {
  const userId = generateUserId();
  clients.set(userId, ws);
  
  console.log(`User ${userId} connected. Total: ${clients.size}`);
  
  // 发送欢迎消息
  ws.send(JSON.stringify({
    type: 'welcome',
    userId,
    message: 'Connected successfully'
  }));
  
  // 接收消息
  ws.on('message', (data) => {
    try {
      const message = JSON.parse(data);
      handleMessage(ws, userId, message);
    } catch (e) {
      ws.send(JSON.stringify({ type: 'error', message: 'Invalid JSON' }));
    }
  });
  
  // 心跳检测
  ws.isAlive = true;
  ws.on('pong', () => { ws.isAlive = true; });
  
  // 断开连接
  ws.on('close', () => {
    clients.delete(userId);
    broadcast({ type: 'user_left', userId });
    console.log(`User ${userId} disconnected`);
  });
});

// 心跳检测
setInterval(() => {
  wss.clients.forEach((ws) => {
    if (!ws.isAlive) return ws.terminate();
    ws.isAlive = false;
    ws.ping();
  });
}, 30000);

// 广播消息
function broadcast(message) {
  const data = JSON.stringify(message);
  wss.clients.forEach((client) => {
    if (client.readyState === WebSocket.OPEN) {
      client.send(data);
    }
  });
}

server.listen(8080, () => {
  console.log('WebSocket server running on port 8080');
});

Python实现

import asyncio
import websockets
import json
from datetime import datetime

connected_clients = set()

async def handler(websocket, path):
    # 注册客户端
    connected_clients.add(websocket)
    client_id = id(websocket)
    
    try:
        # 发送欢迎消息
        await websocket.send(json.dumps({
            'type': 'welcome',
            'clientId': client_id,
            'timestamp': datetime.now().isoformat()
        }))
        
        # 广播新用户加入
        await broadcast({
            'type': 'user_joined',
            'clientId': client_id,
            'count': len(connected_clients)
        })
        
        # 处理消息
        async for message in websocket:
            try:
                data = json.loads(message)
                await handle_message(websocket, data)
            except json.JSONDecodeError:
                await websocket.send(json.dumps({
                    'type': 'error',
                    'message': 'Invalid JSON'
                }))
                
    except websockets.exceptions.ConnectionClosed:
        pass
    finally:
        # 移除客户端
        connected_clients.remove(websocket)
        await broadcast({
            'type': 'user_left',
            'clientId': client_id,
            'count': len(connected_clients)
        })

async def broadcast(message):
    if connected_clients:
        await asyncio.gather(*[
            client.send(json.dumps(message))
            for client in connected_clients
        ])

async def handle_message(websocket, data):
    msg_type = data.get('type')
    
    if msg_type == 'chat':
        await broadcast({
            'type': 'chat',
            'clientId': id(websocket),
            'content': data.get('content'),
            'timestamp': datetime.now().isoformat()
        })
    elif msg_type == 'ping':
        await websocket.send(json.dumps({'type': 'pong'}))

# 启动服务器
start_server = websockets.serve(handler, 'localhost', 8080)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

客户端实现

浏览器原生API

class WebSocketClient {
  constructor(url) {
    this.url = url;
    this.ws = null;
    this.reconnectAttempts = 0;
    this.maxReconnectAttempts = 5;
    this.reconnectDelay = 1000;
    this.listeners = new Map();
  }
  
  connect() {
    this.ws = new WebSocket(this.url);
    
    this.ws.onopen = () => {
      console.log('Connected');
      this.reconnectAttempts = 0;
      this.emit('connected');
    };
    
    this.ws.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data);
        this.emit('message', data);
      } catch (e) {
        console.error('Parse error:', e);
      }
    };
    
    this.ws.onerror = (error) => {
      console.error('WebSocket error:', error);
      this.emit('error', error);
    };
    
    this.ws.onclose = () => {
      console.log('Disconnected');
      this.emit('disconnected');
      this.attemptReconnect();
    };
  }
  
  send(data) {
    if (this.ws && this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(JSON.stringify(data));
    }
  }
  
  attemptReconnect() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++;
      const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
      console.log(`Reconnecting in ${delay}ms...`);
      setTimeout(() => this.connect(), delay);
    }
  }
  
  on(event, callback) {
    if (!this.listeners.has(event)) {
      this.listeners.set(event, []);
    }
    this.listeners.get(event).push(callback);
  }
  
  emit(event, data) {
    if (this.listeners.has(event)) {
      this.listeners.get(event).forEach(cb => cb(data));
    }
  }
}

// 使用示例
const client = new WebSocketClient('ws://localhost:8080');
client.on('message', (data) => {
  console.log('Received:', data);
});
client.connect();

实战:实时聊天应用

服务端

const rooms = new Map();

wss.on('connection', (ws) => {
  let currentRoom = null;
  let username = null;
  
  ws.on('message', (data) => {
    const msg = JSON.parse(data);
    
    switch (msg.type) {
      case 'join':
        username = msg.username;
        currentRoom = msg.room;
        
        if (!rooms.has(currentRoom)) {
          rooms.set(currentRoom, new Set());
        }
        rooms.get(currentRoom).add(ws);
        
        broadcastToRoom(currentRoom, {
          type: 'system',
          message: `${username} joined the room`
        });
        break;
        
      case 'chat':
        if (currentRoom) {
          broadcastToRoom(currentRoom, {
            type: 'chat',
            username,
            content: msg.content,
            timestamp: Date.now()
          });
        }
        break;
        
      case 'typing':
        if (currentRoom) {
          broadcastToRoom(currentRoom, {
            type: 'typing',
            username
          }, ws);
        }
        break;
    }
  });
  
  ws.on('close', () => {
    if (currentRoom && rooms.has(currentRoom)) {
      rooms.get(currentRoom).delete(ws);
      broadcastToRoom(currentRoom, {
        type: 'system',
        message: `${username} left the room`
      });
    }
  });
});

function broadcastToRoom(roomId, message, excludeWs = null) {
  const room = rooms.get(roomId);
  if (!room) return;
  
  const data = JSON.stringify(message);
  room.forEach((client) => {
    if (client !== excludeWs && client.readyState === WebSocket.OPEN) {
      client.send(data);
    }
  });
}

最佳实践

  1. 心跳检测:定期ping/pong检测连接状态
  2. 断线重连:实现指数退避重连策略
  3. 消息确认:重要消息需要ACK机制
  4. 安全性:使用wss协议,验证token
  5. 限流:防止消息洪水攻击
  6. 监控:记录连接数和消息量

WebSocket是实现实时应用的利器,合理使用能够构建流畅的实时交互体验。

客户端 服务器 请求 推送 WebSocket全双工通信

本文链接:https://www.kkkliao.cn/?id=751 转载需授权!

分享到:

版权声明:本文由廖万里的博客发布,如需转载请注明出处。


发表评论

访客

看不清,换一张

◎欢迎参与讨论,请在这里发表您的看法和观点。