Skip to main content

WebSocket协议实现指南 (RFC 6455)

本文档是RFC 6455的详细技术指南和实现参考,包含协议说明、代码示例和最佳实践。如需查看RFC官方章节翻译,请参考各章节文档。

RFC 6455 - The WebSocket Protocol

互联网工程任务组 (IETF)
请求评论: 6455
类别: 标准跟踪

作者:
I. Fette (Google Inc.)
A. Melnikov (Isode Ltd.)

发布日期: 2011年12月


本备忘录的状态

这是一份互联网标准跟踪文档。

本文档是互联网工程任务组 (IETF) 的产品,代表了IETF社区的共识。

版权声明

Copyright (c) 2011 IETF Trust and the persons identified as the document authors. All rights reserved.


摘要

WebSocket协议 (WebSocket Protocol) 在单个TCP连接上实现客户端和服务器之间的全双工通信 (Full-Duplex Communication)。WebSocket协议旨在被在Web浏览器和Web服务器中实现,但它可以被任何客户端或服务器应用程序使用。

WebSocket协议是一个独立的基于TCP的协议。它与HTTP的唯一关系是其握手 (Handshake) 被HTTP服务器解释为升级请求 (Upgrade Request)。

通过设计,WebSocket协议旨在在现有的HTTP基础设施上工作,因此它使用HTTP端口80和443,并支持HTTP代理和中间件,即使这意味着某些复杂性。


目录


WebSocket核心术语表

基本概念

英文术语中文译法说明
WebSocket ProtocolWebSocket协议在单个TCP连接上实现全双工通信的协议
Full-Duplex Communication全双工通信双向同时通信
Opening Handshake开放握手建立WebSocket连接的HTTP升级过程
Closing Handshake关闭握手优雅关闭WebSocket连接的过程
FrameWebSocket数据传输的基本单位
Message消息由一个或多个帧组成的完整应用数据
Client客户端发起WebSocket连接的一方(通常是浏览器)
Server服务器接受WebSocket连接的一方
Endpoint端点客户端或服务器
Connection连接客户端和服务器之间的WebSocket连接

握手相关

英文术语中文译法说明
Upgrade Request升级请求HTTP升级到WebSocket的请求
Sec-WebSocket-KeyWebSocket密钥客户端发送的随机密钥
Sec-WebSocket-AcceptWebSocket接受服务器响应的验证密钥
Sec-WebSocket-ProtocolWebSocket子协议协商的应用层子协议
Sec-WebSocket-ExtensionsWebSocket扩展协商的协议扩展
Sec-WebSocket-VersionWebSocket版本协议版本号(当前为13)
Origin发起连接的Web页面来源

帧结构相关

英文术语中文译法说明
FIN结束标志位标识这是消息的最后一帧
RSV保留位预留给扩展使用的标志位
Opcode操作码定义帧的类型
Mask掩码客户端到服务器的数据必须掩码
Masking-key掩码密钥用于掩码数据的32位密钥
Payload Length有效载荷长度数据长度
Payload Data有效载荷数据实际传输的数据
Extension Data扩展数据扩展协商的额外数据
Application Data应用数据应用层的实际数据

帧类型

英文术语中文译法Opcode说明
Continuation Frame延续帧0x0消息的后续帧
Text Frame文本帧0x1UTF-8编码的文本数据
Binary Frame二进制帧0x2二进制数据
Close Frame关闭帧0x8连接关闭控制帧
Ping FramePing帧0x9心跳检测请求
Pong FramePong帧0xA心跳检测响应
Control Frame控制帧0x8-0xF用于控制连接的帧
Data Frame数据帧0x1-0x2用于传输应用数据的帧

关闭相关

英文术语中文译法说明
Close Code关闭代码指示连接关闭原因的数字代码
Close Reason关闭原因关闭的文本描述
Normal Closure正常关闭代码1000,正常完成目的后关闭
Going Away离开代码1001,端点离开(如页面导航)
Protocol Error协议错误代码1002,协议违规
Unsupported Data不支持的数据代码1003,收到无法接受的数据类型
Invalid Frame Payload Data无效帧载荷数据代码1007,数据不一致
Policy Violation策略违规代码1008,违反策略
Message Too Big消息过大代码1009,消息太大无法处理
TLS Handshake FailureTLS握手失败代码1015,TLS握手失败

文档结构说明

第1节: 引言与概述

  • WebSocket产生的背景和需求
  • 协议的设计目标和原则
  • 与HTTP/TCP的关系
  • 安全模型和设计理念

第2-3节: 基础要求

  • 符合性要求和术语定义
  • WebSocket URI格式 (ws:// 和 wss://)

第4节: 开放握手(核心)

  • 客户端如何发起握手
  • 服务器如何响应握手
  • 子协议和扩展的协商
  • 多版本支持

第5节: 数据帧(核心)

  • 帧结构详细说明
  • 掩码机制
  • 分片(大消息分多帧传输)
  • 控制帧和数据帧

第6-7节: 数据传输和连接关闭

  • 发送和接收消息的规则
  • 正常关闭流程
  • 异常处理
  • 关闭状态码

第8-10节: 扩展、错误和安全

  • 扩展机制
  • 错误处理
  • 详细的安全考虑

WebSocket连接建立流程

1. 客户端发起HTTP升级请求

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat

2. 服务器响应升级

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat

3. 连接建立,开始双向通信

Client ←→ Server
↓ ↓
Text/Binary Frames
Ping/Pong Frames
Close Frame

4. 关闭连接

Client → Server: Close Frame (Code: 1000)
Server → Client: Close Frame (Code: 1000)
TCP连接关闭

握手密钥计算

Sec-WebSocket-Accept计算方法

// 1. 客户端生成随机的Sec-WebSocket-Key (Base64编码的16字节随机数)
const clientKey = "dGhlIHNhbXBsZSBub25jZQ==";

// 2. 魔术字符串 (RFC 6455定义的固定GUID)
const magicString = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";

// 3. 拼接并计算SHA-1哈希
const concatenated = clientKey + magicString;
const hash = SHA1(concatenated);

// 4. Base64编码得到Sec-WebSocket-Accept
const serverAccept = Base64Encode(hash);
// 结果: "s3pPLMBiTxaQ9kYGzzhZRbK+xOo="

这个机制确保:

  • 服务器理解WebSocket协议(而非普通HTTP服务器)
  • 防止缓存代理返回错误的响应
  • 提供基本的握手验证

WebSocket帧结构详解

 0                   1                   2                   3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+

字段说明

FIN (1 bit):

  • 0 = 还有后续帧
  • 1 = 这是消息的最后(或唯一)帧

RSV1, RSV2, RSV3 (各1 bit):

  • 预留给扩展使用
  • 如果未协商扩展,必须为0

Opcode (4 bits):

  • 0x0 = Continuation (延续帧)
  • 0x1 = Text (文本帧,UTF-8)
  • 0x2 = Binary (二进制帧)
  • 0x8 = Close (关闭)
  • 0x9 = Ping
  • 0xA = Pong
  • 0x3-0x7, 0xB-0xF = 保留

MASK (1 bit):

  • 客户端到服务器: 必须为1
  • 服务器到客户端: 必须为0

Payload Length (7 bits, 7+16 bits, 或 7+64 bits):

  • 0-125: 实际长度
  • 126: 后续16位为实际长度
  • 127: 后续64位为实际长度

Masking-key (0 或 4 bytes):

  • 如果MASK=1,包含32位掩码密钥

Payload Data:

  • Extension data + Application data

掩码机制说明

为什么需要掩码?

安全原因: 防止缓存投毒攻击 (Cache Poisoning Attack)。某些中间代理可能错误地缓存WebSocket帧,掩码确保数据不可预测。

掩码算法

// 客户端发送数据时
function maskData(data, maskingKey) {
const masked = new Uint8Array(data.length);
for (let i = 0; i < data.length; i++) {
masked[i] = data[i] ^ maskingKey[i % 4];
}
return masked;
}

// 服务器接收数据时(使用相同算法解码)
function unmaskData(maskedData, maskingKey) {
return maskData(maskedData, maskingKey); // XOR是自逆运算
}

关键点:

  • 使用XOR运算(^)
  • 密钥长度为4字节
  • 循环使用密钥
  • 客户端→服务器: 必须掩码
  • 服务器→客户端: 禁止掩码

消息分片 (Fragmentation)

大消息可以分成多个帧发送:

示例: 发送一个大的文本消息

帧1: FIN=0, Opcode=0x1 (Text), Payload="Hello "
帧2: FIN=0, Opcode=0x0 (Continuation), Payload="World"
帧3: FIN=1, Opcode=0x0 (Continuation), Payload="!"

最终消息: "Hello World!"

规则:

  • 第一帧: FIN=0, Opcode=数据类型 (0x1或0x2)
  • 中间帧: FIN=0, Opcode=0x0 (Continuation)
  • 最后帧: FIN=1, Opcode=0x0 (Continuation)
  • 控制帧不能分片,且可以插入在分片的数据帧之间

控制帧详解

Close帧 (0x8)

结构:
+--------+--------+------------------+
| Code | Reason |
| (2字节) | (UTF-8文本) |
+--------+--------+------------------+

示例:
Close Code: 1000 (正常关闭)
Close Reason: "Going away"

常用关闭代码:

  • 1000: Normal Closure(正常关闭)
  • 1001: Going Away(端点离开,如页面导航)
  • 1002: Protocol Error(协议错误)
  • 1003: Unsupported Data(不支持的数据类型)
  • 1006: Abnormal Closure(异常关闭,没有发送Close帧)
  • 1009: Message Too Big(消息过大)
  • 1011: Internal Server Error(服务器内部错误)

Ping帧 (0x9)

  • 可以由客户端或服务器发送
  • 用于心跳检测(保持连接活跃)
  • 可以携带应用数据(最多125字节)
  • 接收方必须用Pong帧响应

Pong帧 (0xA)

  • 用于响应Ping帧
  • 必须包含Ping帧中的相同Payload
  • 也可以主动发送(单向心跳)

关闭流程

正常关闭 (Clean Close)

1. 发起方发送Close帧
Client → Server: Close Frame (Code: 1000)

2. 接收方响应Close帧
Server → Client: Close Frame (Code: 1000)

3. 发起方关闭TCP连接
Client: 关闭TCP连接

4. 接收方也关闭TCP连接
Server: 关闭TCP连接

异常关闭

  • TCP连接突然断开(没有Close帧)
  • 收到无效的帧数据
  • 超时未收到响应
  • 违反协议规则

状态码: 1006 (Abnormal Closure) - 这个代码不会在Close帧中发送,仅用于报告


WebSocket URI格式

ws:// (非加密)

ws://example.com/socket
ws://example.com:8080/chat
ws://192.168.1.1/data
  • 默认端口: 80
  • 等价于 http:// 的安全级别

wss:// (TLS加密)

wss://example.com/socket
wss://secure.example.com:443/chat
  • 默认端口: 443
  • 等价于 https:// 的安全级别
  • 生产环境强烈推荐使用wss://

子协议 (Subprotocol)

WebSocket是传输层协议,子协议定义应用层的消息格式:

客户端请求:
Sec-WebSocket-Protocol: chat, superchat

服务器响应:
Sec-WebSocket-Protocol: chat

建立的连接使用"chat"子协议

常见子协议:

  • STOMP: 简单文本消息协议
  • MQTT: 物联网消息协议
  • WAMP: Web应用消息协议
  • 自定义应用协议

扩展 (Extensions)

扩展可以增强WebSocket功能:

客户端请求:
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits

服务器响应:
Sec-WebSocket-Extensions: permessage-deflate

常用扩展:

  • permessage-deflate: 消息压缩(RFC 7692)
  • permessage-bzip2: Bzip2压缩
  • 自定义扩展

安全考虑要点

1. 源验证 (Origin Validation)

// 服务器端验证Origin
const allowedOrigins = ['https://example.com', 'https://app.example.com'];
const origin = request.headers['origin'];

if (!allowedOrigins.includes(origin)) {
// 拒绝连接
response.status(403).send('Forbidden');
}

2. TLS加密

  • 必须使用wss://在生产环境
  • 防止中间人攻击
  • 保护数据机密性和完整性

3. 认证授权

方案1: 在握手时通过Cookie认证
GET /socket HTTP/1.1
Cookie: session=abc123

方案2: 通过子协议传递令牌
ws://example.com/socket?token=jwt_token

方案3: 连接后首条消息携带认证信息

4. 速率限制

  • 限制连接数(防止DoS)
  • 限制消息大小
  • 限制消息频率

5. 输入验证

  • 验证UTF-8编码(文本帧)
  • 验证消息格式
  • 防止注入攻击

实现最佳实践

客户端(浏览器)

// 1. 创建WebSocket连接
const ws = new WebSocket('wss://example.com/socket', ['chat']);

// 2. 监听事件
ws.addEventListener('open', (event) => {
console.log('连接已建立');
ws.send('Hello Server!');
});

ws.addEventListener('message', (event) => {
console.log('收到消息:', event.data);
});

ws.addEventListener('error', (event) => {
console.error('WebSocket错误:', event);
});

ws.addEventListener('close', (event) => {
console.log('连接已关闭', event.code, event.reason);
// 实现重连逻辑
if (event.code !== 1000) {
setTimeout(() => reconnect(), 5000);
}
});

// 3. 发送数据
ws.send('文本消息');
ws.send(new Blob(['二进制数据']));
ws.send(new ArrayBuffer(8));

// 4. 关闭连接
ws.close(1000, '正常关闭');

服务器端(Node.js示例)

const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });

wss.on('connection', (ws, request) => {
// 验证Origin
const origin = request.headers.origin;
// ...验证逻辑

// 心跳检测
ws.isAlive = true;
ws.on('pong', () => { ws.isAlive = true; });

// 接收消息
ws.on('message', (data) => {
console.log('收到:', data);

// 广播给所有客户端
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
});

// 错误处理
ws.on('error', (error) => {
console.error('错误:', error);
});

// 关闭
ws.on('close', (code, reason) => {
console.log('连接关闭:', code, reason);
});
});

// 心跳检测
const interval = setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);

常见应用场景

1. 实时聊天应用

  • 即时消息传递
  • 在线状态显示
  • 输入指示器

2. 实时协作工具

  • 文档协同编辑(如Google Docs)
  • 白板共享
  • 代码协作编辑器

3. 实时数据流

  • 股票行情
  • 体育比分
  • 物联网数据

4. 在线游戏

  • 多人在线游戏
  • 实时游戏状态同步
  • 低延迟交互

5. 推送通知

  • 浏览器推送
  • 实时警报
  • 系统监控

与其他技术的比较

WebSocket vs HTTP Long Polling

特性WebSocketLong Polling
连接持久连接重复的HTTP请求
延迟极低较高(HTTP开销)
双向真正双向伪双向
资源低(一个连接)高(多次连接)
复杂度中等较简单

WebSocket vs Server-Sent Events (SSE)

特性WebSocketSSE
双向通信✅ 双向❌ 仅服务器→客户端
数据格式二进制或文本文本(UTF-8)
协议独立协议基于HTTP
浏览器支持广泛广泛(除IE)
重连手动实现自动重连

调试工具

浏览器开发者工具

  • Chrome DevTools → Network → WS
  • 查看帧的发送和接收
  • 查看连接状态

命令行工具

# wscat (Node.js)
npm install -g wscat
wscat -c ws://echo.websocket.org

# websocat (Rust)
websocat ws://echo.websocket.org

在线测试工具

  • websocket.org Echo Test
  • Hoppscotch (原Postwoman)
  • Firecamp

参考资源

相关RFC标准

  • RFC 6455: WebSocket协议(本文档)
  • RFC 7692: WebSocket压缩扩展
  • RFC 8441: HTTP/2上的WebSocket引导
  • RFC 8307: WebSocket的Well-Known URI

实现库推荐

JavaScript (浏览器):

  • 原生WebSocket API

Node.js:

  • ws (轻量级)
  • Socket.IO (功能丰富,自动降级)
  • uWebSockets.js (高性能)

Python:

  • websockets (asyncio)
  • ws4py
  • Tornado

Java:

  • Java WebSocket API (JSR 356)
  • Netty
  • Spring WebSocket

Go:

  • gorilla/websocket
  • nhooyr/websocket

下一步学习建议

  1. 基础: 理解WebSocket握手和帧结构
  2. 实践: 实现简单的聊天应用
  3. 进阶: 学习子协议和扩展
  4. 优化: 实现心跳、重连、错误处理
  5. 安全: 深入理解安全考虑和最佳实践


实战示例:完整的聊天应用

客户端实现(完整版)

class WebSocketClient {
constructor(url, protocols = []) {
this.url = url;
this.protocols = protocols;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
this.heartbeatInterval = null;
this.messageQueue = [];
}

connect() {
try {
this.ws = new WebSocket(this.url, this.protocols);
this.setupEventHandlers();
} catch (error) {
console.error('连接失败:', error);
this.handleReconnect();
}
}

setupEventHandlers() {
this.ws.onopen = (event) => {
console.log('✅ WebSocket连接已建立');
this.reconnectAttempts = 0;

// 发送积压的消息
this.flushMessageQueue();

// 启动心跳
this.startHeartbeat();

// 触发自定义事件
this.onConnectionOpen && this.onConnectionOpen(event);
};

this.ws.onmessage = (event) => {
console.log('📨 收到消息:', event.data);

try {
// 尝试解析JSON
const data = JSON.parse(event.data);
this.onMessage && this.onMessage(data);
} catch {
// 普通文本消息
this.onMessage && this.onMessage(event.data);
}
};

this.ws.onerror = (event) => {
console.error('❌ WebSocket错误:', event);
this.onError && this.onError(event);
};

this.ws.onclose = (event) => {
console.log('🔌 连接已关闭', {
code: event.code,
reason: event.reason,
wasClean: event.wasClean
});

// 停止心跳
this.stopHeartbeat();

// 根据关闭码决定是否重连
if (event.code !== 1000 && event.code !== 1001) {
this.handleReconnect();
}

this.onConnectionClose && this.onConnectionClose(event);
};
}

send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
const message = typeof data === 'object'
? JSON.stringify(data)
: data;
this.ws.send(message);
return true;
} else {
// 连接未就绪,加入队列
this.messageQueue.push(data);
return false;
}
}

flushMessageQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
this.send(message);
}
}

startHeartbeat() {
this.heartbeatInterval = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.send({ type: 'ping', timestamp: Date.now() });
}
}, 30000); // 30秒
}

stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
}

handleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);

console.log(`🔄 ${delay}ms后尝试第${this.reconnectAttempts}次重连...`);

setTimeout(() => {
this.connect();
}, delay);
} else {
console.error('❌ 已达到最大重连次数');
this.onReconnectFailed && this.onReconnectFailed();
}
}

close(code = 1000, reason = '') {
this.stopHeartbeat();
if (this.ws) {
this.ws.close(code, reason);
}
}

getState() {
if (!this.ws) return 'CLOSED';
const states = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'];
return states[this.ws.readyState];
}
}

// 使用示例
const client = new WebSocketClient('wss://chat.example.com', ['chat-v1']);

client.onConnectionOpen = () => {
console.log('连接成功,可以发送消息了');
client.send({ type: 'join', room: 'general', user: 'Alice' });
};

client.onMessage = (data) => {
if (data.type === 'message') {
displayMessage(data);
} else if (data.type === 'pong') {
console.log('收到心跳响应');
}
};

client.onError = (event) => {
console.error('发生错误,请检查网络连接');
};

client.connect();

// 发送聊天消息
function sendChatMessage(text) {
client.send({
type: 'message',
content: text,
timestamp: Date.now()
});
}

服务器实现(Node.js - 生产级)

const WebSocket = require('ws');
const http = require('http');
const url = require('url');

class WebSocketServer {
constructor(options = {}) {
this.port = options.port || 8080;
this.clients = new Map();
this.rooms = new Map();
this.server = http.createServer();
this.wss = new WebSocket.Server({
server: this.server,
verifyClient: this.verifyClient.bind(this)
});

this.setupServer();
}

verifyClient(info, callback) {
// Origin验证
const allowedOrigins = [
'https://example.com',
'https://app.example.com'
];

const origin = info.origin || info.req.headers.origin;

if (!origin || !allowedOrigins.includes(origin)) {
callback(false, 403, 'Origin not allowed');
return;
}

// Token验证(从URL参数)
const params = url.parse(info.req.url, true).query;
if (!params.token || !this.validateToken(params.token)) {
callback(false, 401, 'Invalid token');
return;
}

callback(true);
}

validateToken(token) {
// 实际应该验证JWT等
return token && token.length > 0;
}

setupServer() {
this.wss.on('connection', (ws, request) => {
const clientId = this.generateClientId();
const clientInfo = {
id: clientId,
ws: ws,
ip: request.socket.remoteAddress,
connectedAt: new Date(),
isAlive: true,
user: null,
room: null
};

this.clients.set(clientId, clientInfo);
console.log(`✅ 新连接: ${clientId} from ${clientInfo.ip}`);
console.log(`📊 当前连接数: ${this.clients.size}`);

// 心跳检测
ws.on('pong', () => {
clientInfo.isAlive = true;
});

// 消息处理
ws.on('message', (data) => {
this.handleMessage(clientId, data);
});

// 错误处理
ws.on('error', (error) => {
console.error(`❌ 客户端${clientId}错误:`, error);
});

// 关闭处理
ws.on('close', (code, reason) => {
console.log(`🔌 客户端${clientId}断开: ${code} - ${reason}`);
this.handleClientDisconnect(clientId);
});

// 欢迎消息
this.sendToClient(clientId, {
type: 'welcome',
clientId: clientId,
serverTime: Date.now()
});
});

// 定期心跳检测
this.startHeartbeat();
}

handleMessage(clientId, data) {
try {
const message = JSON.parse(data);
const client = this.clients.get(clientId);

switch (message.type) {
case 'ping':
this.sendToClient(clientId, { type: 'pong', timestamp: Date.now() });
break;

case 'join':
this.handleJoinRoom(clientId, message.room, message.user);
break;

case 'message':
this.handleChatMessage(clientId, message);
break;

case 'leave':
this.handleLeaveRoom(clientId);
break;

default:
console.warn(`未知消息类型: ${message.type}`);
}
} catch (error) {
console.error('消息处理错误:', error);
this.sendToClient(clientId, {
type: 'error',
message: 'Invalid message format'
});
}
}

handleJoinRoom(clientId, roomName, userName) {
const client = this.clients.get(clientId);
if (!client) return;

// 离开当前房间
if (client.room) {
this.handleLeaveRoom(clientId);
}

// 加入新房间
if (!this.rooms.has(roomName)) {
this.rooms.set(roomName, new Set());
}

this.rooms.get(roomName).add(clientId);
client.room = roomName;
client.user = userName;

console.log(`👤 ${userName} 加入房间 ${roomName}`);

// 通知房间内其他人
this.broadcastToRoom(roomName, {
type: 'user_joined',
user: userName,
timestamp: Date.now()
}, clientId);

// 发送房间信息给新用户
const roomUsers = Array.from(this.rooms.get(roomName))
.map(id => this.clients.get(id)?.user)
.filter(u => u);

this.sendToClient(clientId, {
type: 'room_joined',
room: roomName,
users: roomUsers
});
}

handleChatMessage(clientId, message) {
const client = this.clients.get(clientId);
if (!client || !client.room) return;

// 广播消息到房间
this.broadcastToRoom(client.room, {
type: 'message',
from: client.user,
content: message.content,
timestamp: Date.now()
});
}

handleLeaveRoom(clientId) {
const client = this.clients.get(clientId);
if (!client || !client.room) return;

const roomName = client.room;
const room = this.rooms.get(roomName);

if (room) {
room.delete(clientId);

// 通知房间内其他人
this.broadcastToRoom(roomName, {
type: 'user_left',
user: client.user,
timestamp: Date.now()
});

// 如果房间为空,删除房间
if (room.size === 0) {
this.rooms.delete(roomName);
console.log(`🗑️ 房间 ${roomName} 已删除(无用户)`);
}
}

client.room = null;
}

handleClientDisconnect(clientId) {
this.handleLeaveRoom(clientId);
this.clients.delete(clientId);
console.log(`📊 当前连接数: ${this.clients.size}`);
}

sendToClient(clientId, data) {
const client = this.clients.get(clientId);
if (client && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(JSON.stringify(data));
}
}

broadcastToRoom(roomName, data, excludeClientId = null) {
const room = this.rooms.get(roomName);
if (!room) return;

const message = JSON.stringify(data);
room.forEach(clientId => {
if (clientId !== excludeClientId) {
const client = this.clients.get(clientId);
if (client && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(message);
}
}
});
}

startHeartbeat() {
setInterval(() => {
this.clients.forEach((client, clientId) => {
if (client.isAlive === false) {
console.log(`💀 客户端${clientId}心跳超时,断开连接`);
client.ws.terminate();
return;
}

client.isAlive = false;
client.ws.ping();
});
}, 30000);
}

generateClientId() {
return `client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}

start() {
this.server.listen(this.port, () => {
console.log(`🚀 WebSocket服务器运行在端口 ${this.port}`);
});
}

shutdown() {
console.log('🛑 正在关闭服务器...');

// 通知所有客户端
this.clients.forEach((client) => {
client.ws.close(1001, 'Server shutting down');
});

// 关闭服务器
this.wss.close(() => {
this.server.close(() => {
console.log('✅ 服务器已关闭');
});
});
}
}

// 启动服务器
const server = new WebSocketServer({ port: 8080 });
server.start();

// 优雅关闭
process.on('SIGTERM', () => server.shutdown());
process.on('SIGINT', () => server.shutdown());

Python服务器实现(asyncio)

import asyncio
import websockets
import json
import time
from collections import defaultdict

class WebSocketChatServer:
def __init__(self, host='localhost', port=8080):
self.host = host
self.port = port
self.clients = {}
self.rooms = defaultdict(set)

async def register(self, websocket, client_id):
"""注册新客户端"""
self.clients[client_id] = {
'ws': websocket,
'user': None,
'room': None,
'connected_at': time.time()
}
print(f"✅ 新连接: {client_id}")

async def unregister(self, client_id):
"""注销客户端"""
client = self.clients.get(client_id)
if client and client['room']:
await self.handle_leave_room(client_id)

if client_id in self.clients:
del self.clients[client_id]
print(f"🔌 断开: {client_id}")

async def handle_client(self, websocket, path):
"""处理单个客户端连接"""
client_id = f"client_{int(time.time())}_{id(websocket)}"
await self.register(websocket, client_id)

try:
# 发送欢迎消息
await websocket.send(json.dumps({
'type': 'welcome',
'client_id': client_id,
'timestamp': time.time()
}))

# 消息循环
async for message in websocket:
await self.handle_message(client_id, message)

except websockets.exceptions.ConnectionClosed:
print(f"连接关闭: {client_id}")
finally:
await self.unregister(client_id)

async def handle_message(self, client_id, message):
"""处理接收到的消息"""
try:
data = json.loads(message)
msg_type = data.get('type')

if msg_type == 'ping':
await self.send_to_client(client_id, {
'type': 'pong',
'timestamp': time.time()
})

elif msg_type == 'join':
await self.handle_join_room(
client_id,
data.get('room'),
data.get('user')
)

elif msg_type == 'message':
await self.handle_chat_message(client_id, data)

elif msg_type == 'leave':
await self.handle_leave_room(client_id)

except json.JSONDecodeError:
print(f"无效的JSON消息从 {client_id}")
except Exception as e:
print(f"处理消息错误: {e}")

async def handle_join_room(self, client_id, room_name, user_name):
"""处理加入房间"""
client = self.clients.get(client_id)
if not client:
return

# 离开当前房间
if client['room']:
await self.handle_leave_room(client_id)

# 加入新房间
self.rooms[room_name].add(client_id)
client['room'] = room_name
client['user'] = user_name

print(f"👤 {user_name} 加入房间 {room_name}")

# 通知房间其他成员
await self.broadcast_to_room(room_name, {
'type': 'user_joined',
'user': user_name,
'timestamp': time.time()
}, exclude=client_id)

# 发送房间信息
room_users = [
self.clients[cid]['user']
for cid in self.rooms[room_name]
if self.clients[cid]['user']
]

await self.send_to_client(client_id, {
'type': 'room_joined',
'room': room_name,
'users': room_users
})

async def handle_chat_message(self, client_id, data):
"""处理聊天消息"""
client = self.clients.get(client_id)
if not client or not client['room']:
return

await self.broadcast_to_room(client['room'], {
'type': 'message',
'from': client['user'],
'content': data.get('content'),
'timestamp': time.time()
})

async def handle_leave_room(self, client_id):
"""处理离开房间"""
client = self.clients.get(client_id)
if not client or not client['room']:
return

room_name = client['room']

# 从房间移除
self.rooms[room_name].discard(client_id)

# 通知其他成员
await self.broadcast_to_room(room_name, {
'type': 'user_left',
'user': client['user'],
'timestamp': time.time()
})

# 清空房间信息
client['room'] = None

# 删除空房间
if len(self.rooms[room_name]) == 0:
del self.rooms[room_name]
print(f"🗑️ 房间 {room_name} 已删除")

async def send_to_client(self, client_id, data):
"""发送消息给单个客户端"""
client = self.clients.get(client_id)
if client:
await client['ws'].send(json.dumps(data))

async def broadcast_to_room(self, room_name, data, exclude=None):
"""广播消息到房间"""
room = self.rooms.get(room_name)
if not room:
return

message = json.dumps(data)
tasks = []

for client_id in room:
if client_id != exclude:
client = self.clients.get(client_id)
if client:
tasks.append(client['ws'].send(message))

if tasks:
await asyncio.gather(*tasks, return_exceptions=True)

async def start(self):
"""启动服务器"""
async with websockets.serve(
self.handle_client,
self.host,
self.port,
ping_interval=30,
ping_timeout=10
):
print(f"🚀 WebSocket服务器运行在 ws://{self.host}:{self.port}")
await asyncio.Future() # 永久运行

# 启动
if __name__ == '__main__':
server = WebSocketChatServer()
asyncio.run(server.start())

性能优化技巧

1. 消息批处理

class BatchedWebSocket {
constructor(url) {
this.ws = new WebSocket(url);
this.messageQueue = [];
this.batchSize = 10;
this.batchTimeout = 100; // ms
this.timer = null;
}

send(data) {
this.messageQueue.push(data);

if (this.messageQueue.length >= this.batchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.batchTimeout);
}
}

flush() {
if (this.messageQueue.length === 0) return;

this.ws.send(JSON.stringify({
type: 'batch',
messages: this.messageQueue
}));

this.messageQueue = [];
clearTimeout(this.timer);
this.timer = null;
}
}

2. 消息压缩

// 使用permessage-deflate扩展
const ws = new WebSocket('wss://example.com', {
perMessageDeflate: {
zlibDeflateOptions: {
chunkSize: 1024,
memLevel: 7,
level: 3
},
threshold: 1024 // 只压缩>1KB的消息
}
});

3. 二进制帧优化

// 发送二进制数据(比JSON更高效)
const data = new Uint8Array([1, 2, 3, 4, 5]);
ws.send(data.buffer);

// 接收二进制数据
ws.binaryType = 'arraybuffer';
ws.onmessage = (event) => {
if (event.data instanceof ArrayBuffer) {
const view = new Uint8Array(event.data);
// 处理二进制数据
}
};

监控和指标

连接统计

class WebSocketMonitor {
constructor(wss) {
this.wss = wss;
this.metrics = {
totalConnections: 0,
activeConnections: 0,
messagesReceived: 0,
messagesSent: 0,
bytesReceived: 0,
bytesSent: 0,
errors: 0
};

this.startMonitoring();
}

startMonitoring() {
setInterval(() => {
this.metrics.activeConnections = this.wss.clients.size;
this.report();
}, 60000); // 每分钟报告
}

report() {
console.log('📊 WebSocket统计:');
console.log(` 活跃连接: ${this.metrics.activeConnections}`);
console.log(` 总连接数: ${this.metrics.totalConnections}`);
console.log(` 消息接收: ${this.metrics.messagesReceived}`);
console.log(` 消息发送: ${this.metrics.messagesSent}`);
console.log(` 接收字节: ${(this.metrics.bytesReceived / 1024).toFixed(2)} KB`);
console.log(` 发送字节: ${(this.metrics.bytesSent / 1024).toFixed(2)} KB`);
console.log(` 错误次数: ${this.metrics.errors}`);
}
}

总结

WebSocket协议是Web实时通信的基石:

  • : 准确实现双向通信
  • : 清晰的协议规范
  • : 优雅的设计和实现

核心价值

  1. 🔄 真正的全双工通信
  2. ⚡ 低延迟实时数据传输
  3. 📦 轻量级帧协议
  4. 🔐 内置安全机制

最佳使用场景

  • 实时聊天
  • 协作编辑
  • 实时游戏
  • 实时数据流
  • 推送通知

相关RFC

  • RFC 6455: WebSocket协议(本文档)
  • RFC 7692: WebSocket压缩扩展
  • RFC 8441: HTTP/2上的WebSocket引导

返回RFC列表