WebSocket协议实现指南 (RFC 6455)
本文档是RFC 6455的详细技术指南和实现参考,包含协议说明、代码示例和最佳实践。如需查看RFC官方章节翻译,请参考各章节文档。
RFC 6455 - The WebSocket Protocol
互联网工程任务组 (IETF)
请求评论: 6455
类别: 标准跟踪
作者:
I. Fette (Google Inc.)
A. Melnikov (Isode Ltd.)
发布日期: 2011年12月
本备忘录的状态
这是一份互联网标准跟踪文档。
本文档是互联网工程任务组 (IETF) 的产品,代表了IETF社区的共识。
版权声明
Copyright (c) 2011 IETF Trust and the persons identified as the document authors. All rights reserved.
摘要
WebSocket协议 (WebSocket Protocol) 在单个TCP连接上实现客户端和服务器之间的全双工通信 (Full-Duplex Communication)。WebSocket协议旨在被在Web浏览器和Web服务器中实现,但它可以被任何客户端或服务器应用程序使用。
WebSocket协议是一个独立的基于TCP的协议。它与HTTP的唯一关系是其握手 (Handshake) 被HTTP服务器解释为升级请求 (Upgrade Request)。
通过设计,WebSocket协议旨在在现有的HTTP基础设施上工作,因此它使用HTTP端口80和443,并支持HTTP代理和中间件,即使这意味着某些复杂性。
目录
- 1. 引言 (Introduction)
- 2. 符合性要求 (Conformance Requirements)
- 3. WebSocket URI
- 4. 开放握手 (Opening Handshake)
- 5. 数据帧 (Data Framing)
- 6. 发送和接收数据 (Sending and Receiving Data)
- 7. 关闭连接 (Closing the Connection)
- 8. 错误处理 (Error Handling)
- 9. 扩展 (Extensions)
- 10. 安全考虑 (Security Considerations)
- 11. IANA考虑 (IANA Considerations)
- 12. 使用其他规范中的WebSocket协议
- 13. 致谢
- 14. 参考文献
WebSocket核心术语表
基本概念
| 英文术语 | 中文译法 | 说明 |
|---|---|---|
| WebSocket Protocol | WebSocket协议 | 在单个TCP连接上实现全双工通信的协议 |
| Full-Duplex Communication | 全双工通信 | 双向同时通信 |
| Opening Handshake | 开放握手 | 建立WebSocket连接的HTTP升级过程 |
| Closing Handshake | 关闭握手 | 优雅关闭WebSocket连接的过程 |
| Frame | 帧 | WebSocket数据传输的基本单位 |
| Message | 消息 | 由一个或多个帧组成的完整应用数据 |
| Client | 客户端 | 发起WebSocket连接的一方(通常是浏览器) |
| Server | 服务器 | 接受WebSocket连接的一方 |
| Endpoint | 端点 | 客户端或服务器 |
| Connection | 连接 | 客户端和服务器之间的WebSocket连接 |
握手相关
| 英文术语 | 中文译法 | 说明 |
|---|---|---|
| Upgrade Request | 升级请求 | HTTP升级到WebSocket的请求 |
| Sec-WebSocket-Key | WebSocket密钥 | 客户端发送的随机密钥 |
| Sec-WebSocket-Accept | WebSocket接受 | 服务器响应的验证密钥 |
| Sec-WebSocket-Protocol | WebSocket子协议 | 协商的应用层子协议 |
| Sec-WebSocket-Extensions | WebSocket扩展 | 协商的协议扩展 |
| Sec-WebSocket-Version | WebSocket版本 | 协议版本号(当前为13) |
| Origin | 源 | 发起连接的Web页面来源 |
帧结构相关
| 英文术语 | 中文译法 | 说明 |
|---|---|---|
| FIN | 结束标志位 | 标识这是消息的最后一帧 |
| RSV | 保留位 | 预留给扩展使用的标志位 |
| Opcode | 操作码 | 定义帧的类型 |
| Mask | 掩码 | 客户端到服务器的数据必须掩码 |
| Masking-key | 掩码密钥 | 用于掩码数据的32位密钥 |
| Payload Length | 有效载荷长度 | 数据长度 |
| Payload Data | 有效载荷数据 | 实际传输的数据 |
| Extension Data | 扩展数据 | 扩展协商的额外数据 |
| Application Data | 应用数据 | 应用层的实际数据 |
帧类型
| 英文术语 | 中文译法 | Opcode | 说明 |
|---|---|---|---|
| Continuation Frame | 延续帧 | 0x0 | 消息的后续帧 |
| Text Frame | 文本帧 | 0x1 | UTF-8编码的文本数据 |
| Binary Frame | 二进制帧 | 0x2 | 二进制数据 |
| Close Frame | 关闭帧 | 0x8 | 连接关闭控制帧 |
| Ping Frame | Ping帧 | 0x9 | 心跳检测请求 |
| Pong Frame | Pong帧 | 0xA | 心跳检测响应 |
| Control Frame | 控制帧 | 0x8-0xF | 用于控制连接的帧 |
| Data Frame | 数据帧 | 0x1-0x2 | 用于传输应用数据的帧 |
关闭相关
| 英文术语 | 中文译法 | 说明 |
|---|---|---|
| Close Code | 关闭代码 | 指示连接关闭原因的数字代码 |
| Close Reason | 关闭原因 | 关闭的文本描述 |
| Normal Closure | 正常关闭 | 代码1000,正常完成目的后关闭 |
| Going Away | 离开 | 代码1001,端点离开(如页面导航) |
| Protocol Error | 协议错误 | 代码1002,协议违规 |
| Unsupported Data | 不支持的数据 | 代码1003,收到无法接受的数据类型 |
| Invalid Frame Payload Data | 无效帧载荷数据 | 代码1007,数据不一致 |
| Policy Violation | 策略违规 | 代码1008,违反策略 |
| Message Too Big | 消息过大 | 代码1009,消息太大无法处理 |
| TLS Handshake Failure | TLS握手失败 | 代码1015,TLS握手失败 |
文档结构说明
第1节: 引言与概述
- WebSocket产生的背景和需求
- 协议的设计目标和原则
- 与HTTP/TCP的关系
- 安全模型和设计理念
第2-3节: 基础要求
- 符合性要求和术语定义
- WebSocket URI格式 (ws:// 和 wss://)
第4节: 开放握手(核心)
- 客户端如何发起握手
- 服务器如何响应握手
- 子协议和扩展的协商
- 多版本支持
第5节: 数据帧(核心)
- 帧结构详细说明
- 掩码机制
- 分片(大消息分多帧传输)
- 控制帧和数据帧
第6-7节: 数据传输和连接关闭
- 发送和接收消息的规则
- 正常关闭流程
- 异常处理
- 关闭状态码
第8-10节: 扩展、错误和安全
- 扩展机制
- 错误处理
- 详细的安全考虑
WebSocket连接建立流程
1. 客户端发起HTTP升级请求
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
Origin: http://example.com
Sec-WebSocket-Protocol: chat, superchat
2. 服务器响应升级
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Protocol: chat
3. 连接建立,开始双向通信
Client ←→ Server
↓ ↓
Text/Binary Frames
Ping/Pong Frames
Close Frame
4. 关闭连接
Client → Server: Close Frame (Code: 1000)
Server → Client: Close Frame (Code: 1000)
TCP连接关闭
握手密钥计算
Sec-WebSocket-Accept计算方法
// 1. 客户端生成随机的Sec-WebSocket-Key (Base64编码的16字节随机数)
const clientKey = "dGhlIHNhbXBsZSBub25jZQ==";
// 2. 魔术字符串 (RFC 6455定义的固定GUID)
const magicString = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
// 3. 拼接并计算SHA-1哈希
const concatenated = clientKey + magicString;
const hash = SHA1(concatenated);
// 4. Base64编码得到Sec-WebSocket-Accept
const serverAccept = Base64Encode(hash);
// 结果: "s3pPLMBiTxaQ9kYGzzhZRbK+xOo="
这个机制确保:
- 服务器理解WebSocket协议(而非普通HTTP服务器)
- 防止缓存代理返回错误的响应
- 提供基本的握手验证
WebSocket帧结构详解
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
字段说明
FIN (1 bit):
- 0 = 还有后续帧
- 1 = 这是消息的最后(或唯一)帧
RSV1, RSV2, RSV3 (各1 bit):
- 预留给扩展使用
- 如果未协商扩展,必须为0
Opcode (4 bits):
- 0x0 = Continuation (延续帧)
- 0x1 = Text (文本帧,UTF-8)
- 0x2 = Binary (二进制帧)
- 0x8 = Close (关闭)
- 0x9 = Ping
- 0xA = Pong
- 0x3-0x7, 0xB-0xF = 保留
MASK (1 bit):
- 客户端到服务器: 必须为1
- 服务器到客户端: 必须为0
Payload Length (7 bits, 7+16 bits, 或 7+64 bits):
- 0-125: 实际长度
- 126: 后续16位为实际长度
- 127: 后续64位为实际长度
Masking-key (0 或 4 bytes):
- 如果MASK=1,包含32位掩码密钥
Payload Data:
- Extension data + Application data
掩码机制说明
为什么需要掩码?
安全原因: 防止缓存投毒攻击 (Cache Poisoning Attack)。某些中间代理可能错误地缓存WebSocket帧,掩码确保数据不可预测。
掩码算法
// 客户端发送数据时
function maskData(data, maskingKey) {
const masked = new Uint8Array(data.length);
for (let i = 0; i < data.length; i++) {
masked[i] = data[i] ^ maskingKey[i % 4];
}
return masked;
}
// 服务器接收数据时(使用相同算法解码)
function unmaskData(maskedData, maskingKey) {
return maskData(maskedData, maskingKey); // XOR是自逆运算
}
关键点:
- 使用XOR运算(^)
- 密钥长度为4字节
- 循环使用密钥
- 客户端→服务器: 必须掩码
- 服务器→客户端: 禁止掩码
消息分片 (Fragmentation)
大消息可以分成多个帧发送:
示例: 发送一个大的文本消息
帧1: FIN=0, Opcode=0x1 (Text), Payload="Hello "
帧2: FIN=0, Opcode=0x0 (Continuation), Payload="World"
帧3: FIN=1, Opcode=0x0 (Continuation), Payload="!"
最终消息: "Hello World!"
规则:
- 第一帧: FIN=0, Opcode=数据类型 (0x1或0x2)
- 中间帧: FIN=0, Opcode=0x0 (Continuation)
- 最后帧: FIN=1, Opcode=0x0 (Continuation)
- 控制帧不能分片,且可以插入在分片的数据帧之间
控制帧详解
Close帧 (0x8)
结构:
+--------+--------+------------------+
| Code | Reason |
| (2字节) | (UTF-8文本) |
+--------+--------+------------------+
示例:
Close Code: 1000 (正常关闭)
Close Reason: "Going away"
常用关闭代码:
- 1000: Normal Closure(正常关闭)
- 1001: Going Away(端点离开,如页面导航)
- 1002: Protocol Error(协议错误)
- 1003: Unsupported Data(不支持的数据类型)
- 1006: Abnormal Closure(异常关闭,没有发送Close帧)
- 1009: Message Too Big(消息过大)
- 1011: Internal Server Error(服务器内部错误)
Ping帧 (0x9)
- 可以由客户端或服务器发送
- 用于心跳检测(保持连接活跃)
- 可以携带应用数据(最多125字节)
- 接收方必须用Pong帧响应
Pong帧 (0xA)
- 用于响应Ping帧
- 必须包含Ping帧中的相同Payload
- 也可以主动发送(单向心跳)
关闭流程
正常关闭 (Clean Close)
1. 发起方发送Close帧
Client → Server: Close Frame (Code: 1000)
2. 接收方响应Close帧
Server → Client: Close Frame (Code: 1000)
3. 发起方关闭TCP连接
Client: 关闭TCP连接
4. 接收方也关闭TCP连接
Server: 关闭TCP连接
异常关闭
- TCP连接突然断开(没有Close帧)
- 收到无效的帧数据
- 超时未收到响应
- 违反协议规则
状态码: 1006 (Abnormal Closure) - 这个代码不会在Close帧中发送,仅用于报告
WebSocket URI格式
ws:// (非加密)
ws://example.com/socket
ws://example.com:8080/chat
ws://192.168.1.1/data
- 默认端口: 80
- 等价于 http:// 的安全级别
wss:// (TLS加密)
wss://example.com/socket
wss://secure.example.com:443/chat
- 默认端口: 443
- 等价于 https:// 的安全级别
- 生产环境强烈推荐使用wss://
子协议 (Subprotocol)
WebSocket是传输层协议,子协议定义应用层的消息格式:
客户端请求:
Sec-WebSocket-Protocol: chat, superchat
服务器响应:
Sec-WebSocket-Protocol: chat
建立的连接使用"chat"子协议
常见子协议:
- STOMP: 简单文本消息协议
- MQTT: 物联网消息协议
- WAMP: Web应用消息协议
- 自定义应用协议
扩展 (Extensions)
扩展可以增强WebSocket功能:
客户端请求:
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
服务器响应:
Sec-WebSocket-Extensions: permessage-deflate
常用扩展:
- permessage-deflate: 消息压缩(RFC 7692)
- permessage-bzip2: Bzip2压缩
- 自定义扩展
安全考虑要点
1. 源验证 (Origin Validation)
// 服务器端验证Origin
const allowedOrigins = ['https://example.com', 'https://app.example.com'];
const origin = request.headers['origin'];
if (!allowedOrigins.includes(origin)) {
// 拒绝连接
response.status(403).send('Forbidden');
}
2. TLS加密
- 必须使用wss://在生产环境
- 防止中间人攻击
- 保护数据机密性和完整性
3. 认证授权
方案1: 在握手时通过Cookie认证
GET /socket HTTP/1.1
Cookie: session=abc123
方案2: 通过子协议传递令牌
ws://example.com/socket?token=jwt_token
方案3: 连接后首条消息携带认证信息
4. 速率限制
- 限制连接数(防止DoS)
- 限制消息大小
- 限制消息频率
5. 输入验证
- 验证UTF-8编码(文本帧)
- 验证消息格式
- 防止注入攻击
实现最佳实践
客户端(浏览器)
// 1. 创建WebSocket连接
const ws = new WebSocket('wss://example.com/socket', ['chat']);
// 2. 监听事件
ws.addEventListener('open', (event) => {
console.log('连接已建立');
ws.send('Hello Server!');
});
ws.addEventListener('message', (event) => {
console.log('收到消息:', event.data);
});
ws.addEventListener('error', (event) => {
console.error('WebSocket错误:', event);
});
ws.addEventListener('close', (event) => {
console.log('连接已关闭', event.code, event.reason);
// 实现重连逻辑
if (event.code !== 1000) {
setTimeout(() => reconnect(), 5000);
}
});
// 3. 发送数据
ws.send('文本消息');
ws.send(new Blob(['二进制数据']));
ws.send(new ArrayBuffer(8));
// 4. 关闭连接
ws.close(1000, '正常关闭');
服务器端(Node.js示例)
const WebSocket = require('ws');
const wss = new WebSocket.Server({ port: 8080 });
wss.on('connection', (ws, request) => {
// 验证Origin
const origin = request.headers.origin;
// ...验证逻辑
// 心跳检测
ws.isAlive = true;
ws.on('pong', () => { ws.isAlive = true; });
// 接收消息
ws.on('message', (data) => {
console.log('收到:', data);
// 广播给所有客户端
wss.clients.forEach((client) => {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
});
});
// 错误处理
ws.on('error', (error) => {
console.error('错误:', error);
});
// 关闭
ws.on('close', (code, reason) => {
console.log('连接关闭:', code, reason);
});
});
// 心跳检测
const interval = setInterval(() => {
wss.clients.forEach((ws) => {
if (ws.isAlive === false) {
return ws.terminate();
}
ws.isAlive = false;
ws.ping();
});
}, 30000);
常见应用场景
1. 实时聊天应用
- 即时消息传递
- 在线状态显示
- 输入指示器
2. 实时协作工具
- 文档协同编辑(如Google Docs)
- 白板共享
- 代码协作编辑器
3. 实时数据流
- 股票行情
- 体育比分
- 物联网数据
4. 在线游戏
- 多人在线游戏
- 实时游戏状态同步
- 低延迟交互
5. 推送通知
- 浏览器推送
- 实时警报
- 系统监控
与其他技术的比较
WebSocket vs HTTP Long Polling
| 特性 | WebSocket | Long Polling |
|---|---|---|
| 连接 | 持久连接 | 重复的HTTP请求 |
| 延迟 | 极低 | 较高(HTTP开销) |
| 双向 | 真正双向 | 伪双向 |
| 资源 | 低(一个连接) | 高(多次连接) |
| 复杂度 | 中等 | 较简单 |
WebSocket vs Server-Sent Events (SSE)
| 特性 | WebSocket | SSE |
|---|---|---|
| 双向通信 | ✅ 双向 | ❌ 仅服务器→客户端 |
| 数据格式 | 二进制或文本 | 文本(UTF-8) |
| 协议 | 独立协议 | 基于HTTP |
| 浏览器支持 | 广泛 | 广泛(除IE) |
| 重连 | 手动实现 | 自动重连 |
调试工具
浏览器开发者工具
- Chrome DevTools → Network → WS
- 查看帧的发送和接收
- 查看连接状态
命令行工具
# wscat (Node.js)
npm install -g wscat
wscat -c ws://echo.websocket.org
# websocat (Rust)
websocat ws://echo.websocket.org
在线测试工具
- websocket.org Echo Test
- Hoppscotch (原Postwoman)
- Firecamp
参考资源
相关RFC标准
- RFC 6455: WebSocket协议(本文档)
- RFC 7692: WebSocket压缩扩展
- RFC 8441: HTTP/2上的WebSocket引导
- RFC 8307: WebSocket的Well-Known URI
实现库推荐
JavaScript (浏览器):
- 原生WebSocket API
Node.js:
- ws (轻量级)
- Socket.IO (功能丰富,自动降级)
- uWebSockets.js (高性能)
Python:
- websockets (asyncio)
- ws4py
- Tornado
Java:
- Java WebSocket API (JSR 356)
- Netty
- Spring WebSocket
Go:
- gorilla/websocket
- nhooyr/websocket
下一步学习建议
- 基础: 理解WebSocket握手和帧结构
- 实践: 实现简单的聊天应用
- 进阶: 学习子协议和扩展
- 优化: 实现心跳、重连、错误处理
- 安全: 深入理解安全考虑和最佳实践
实战示例:完整的聊天应用
客户端实现(完整版)
class WebSocketClient {
constructor(url, protocols = []) {
this.url = url;
this.protocols = protocols;
this.ws = null;
this.reconnectAttempts = 0;
this.maxReconnectAttempts = 5;
this.reconnectDelay = 1000;
this.heartbeatInterval = null;
this.messageQueue = [];
}
connect() {
try {
this.ws = new WebSocket(this.url, this.protocols);
this.setupEventHandlers();
} catch (error) {
console.error('连接失败:', error);
this.handleReconnect();
}
}
setupEventHandlers() {
this.ws.onopen = (event) => {
console.log('✅ WebSocket连接已建立');
this.reconnectAttempts = 0;
// 发送积压的消息
this.flushMessageQueue();
// 启动心跳
this.startHeartbeat();
// 触发自定义事件
this.onConnectionOpen && this.onConnectionOpen(event);
};
this.ws.onmessage = (event) => {
console.log('📨 收到消息:', event.data);
try {
// 尝试解析JSON
const data = JSON.parse(event.data);
this.onMessage && this.onMessage(data);
} catch {
// 普通文本消息
this.onMessage && this.onMessage(event.data);
}
};
this.ws.onerror = (event) => {
console.error('❌ WebSocket错误:', event);
this.onError && this.onError(event);
};
this.ws.onclose = (event) => {
console.log('🔌 连接已关闭', {
code: event.code,
reason: event.reason,
wasClean: event.wasClean
});
// 停止心跳
this.stopHeartbeat();
// 根据关闭码决定是否重连
if (event.code !== 1000 && event.code !== 1001) {
this.handleReconnect();
}
this.onConnectionClose && this.onConnectionClose(event);
};
}
send(data) {
if (this.ws && this.ws.readyState === WebSocket.OPEN) {
const message = typeof data === 'object'
? JSON.stringify(data)
: data;
this.ws.send(message);
return true;
} else {
// 连接未就绪,加入队列
this.messageQueue.push(data);
return false;
}
}
flushMessageQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift();
this.send(message);
}
}
startHeartbeat() {
this.heartbeatInterval = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.send({ type: 'ping', timestamp: Date.now() });
}
}, 30000); // 30秒
}
stopHeartbeat() {
if (this.heartbeatInterval) {
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
}
handleReconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++;
const delay = this.reconnectDelay * Math.pow(2, this.reconnectAttempts - 1);
console.log(`🔄 ${delay}ms后尝试第${this.reconnectAttempts}次重连...`);
setTimeout(() => {
this.connect();
}, delay);
} else {
console.error('❌ 已达到最大重连次数');
this.onReconnectFailed && this.onReconnectFailed();
}
}
close(code = 1000, reason = '') {
this.stopHeartbeat();
if (this.ws) {
this.ws.close(code, reason);
}
}
getState() {
if (!this.ws) return 'CLOSED';
const states = ['CONNECTING', 'OPEN', 'CLOSING', 'CLOSED'];
return states[this.ws.readyState];
}
}
// 使用示例
const client = new WebSocketClient('wss://chat.example.com', ['chat-v1']);
client.onConnectionOpen = () => {
console.log('连接成功,可以发送消息了');
client.send({ type: 'join', room: 'general', user: 'Alice' });
};
client.onMessage = (data) => {
if (data.type === 'message') {
displayMessage(data);
} else if (data.type === 'pong') {
console.log('收到心跳响应');
}
};
client.onError = (event) => {
console.error('发生错误,请检查网络连接');
};
client.connect();
// 发送聊天消息
function sendChatMessage(text) {
client.send({
type: 'message',
content: text,
timestamp: Date.now()
});
}
服务器实现(Node.js - 生产级)
const WebSocket = require('ws');
const http = require('http');
const url = require('url');
class WebSocketServer {
constructor(options = {}) {
this.port = options.port || 8080;
this.clients = new Map();
this.rooms = new Map();
this.server = http.createServer();
this.wss = new WebSocket.Server({
server: this.server,
verifyClient: this.verifyClient.bind(this)
});
this.setupServer();
}
verifyClient(info, callback) {
// Origin验证
const allowedOrigins = [
'https://example.com',
'https://app.example.com'
];
const origin = info.origin || info.req.headers.origin;
if (!origin || !allowedOrigins.includes(origin)) {
callback(false, 403, 'Origin not allowed');
return;
}
// Token验证(从URL参数)
const params = url.parse(info.req.url, true).query;
if (!params.token || !this.validateToken(params.token)) {
callback(false, 401, 'Invalid token');
return;
}
callback(true);
}
validateToken(token) {
// 实际应该验证JWT等
return token && token.length > 0;
}
setupServer() {
this.wss.on('connection', (ws, request) => {
const clientId = this.generateClientId();
const clientInfo = {
id: clientId,
ws: ws,
ip: request.socket.remoteAddress,
connectedAt: new Date(),
isAlive: true,
user: null,
room: null
};
this.clients.set(clientId, clientInfo);
console.log(`✅ 新连接: ${clientId} from ${clientInfo.ip}`);
console.log(`📊 当前连接数: ${this.clients.size}`);
// 心跳检测
ws.on('pong', () => {
clientInfo.isAlive = true;
});
// 消息处理
ws.on('message', (data) => {
this.handleMessage(clientId, data);
});
// 错误处理
ws.on('error', (error) => {
console.error(`❌ 客户端${clientId}错误:`, error);
});
// 关闭处理
ws.on('close', (code, reason) => {
console.log(`🔌 客户端${clientId}断开: ${code} - ${reason}`);
this.handleClientDisconnect(clientId);
});
// 欢迎消息
this.sendToClient(clientId, {
type: 'welcome',
clientId: clientId,
serverTime: Date.now()
});
});
// 定期心跳检测
this.startHeartbeat();
}
handleMessage(clientId, data) {
try {
const message = JSON.parse(data);
const client = this.clients.get(clientId);
switch (message.type) {
case 'ping':
this.sendToClient(clientId, { type: 'pong', timestamp: Date.now() });
break;
case 'join':
this.handleJoinRoom(clientId, message.room, message.user);
break;
case 'message':
this.handleChatMessage(clientId, message);
break;
case 'leave':
this.handleLeaveRoom(clientId);
break;
default:
console.warn(`未知消息类型: ${message.type}`);
}
} catch (error) {
console.error('消息处理错误:', error);
this.sendToClient(clientId, {
type: 'error',
message: 'Invalid message format'
});
}
}
handleJoinRoom(clientId, roomName, userName) {
const client = this.clients.get(clientId);
if (!client) return;
// 离开当前房间
if (client.room) {
this.handleLeaveRoom(clientId);
}
// 加入新房间
if (!this.rooms.has(roomName)) {
this.rooms.set(roomName, new Set());
}
this.rooms.get(roomName).add(clientId);
client.room = roomName;
client.user = userName;
console.log(`👤 ${userName} 加入房间 ${roomName}`);
// 通知房间内其他人
this.broadcastToRoom(roomName, {
type: 'user_joined',
user: userName,
timestamp: Date.now()
}, clientId);
// 发送房间信息给新用户
const roomUsers = Array.from(this.rooms.get(roomName))
.map(id => this.clients.get(id)?.user)
.filter(u => u);
this.sendToClient(clientId, {
type: 'room_joined',
room: roomName,
users: roomUsers
});
}
handleChatMessage(clientId, message) {
const client = this.clients.get(clientId);
if (!client || !client.room) return;
// 广播消息到房间
this.broadcastToRoom(client.room, {
type: 'message',
from: client.user,
content: message.content,
timestamp: Date.now()
});
}
handleLeaveRoom(clientId) {
const client = this.clients.get(clientId);
if (!client || !client.room) return;
const roomName = client.room;
const room = this.rooms.get(roomName);
if (room) {
room.delete(clientId);
// 通知房间内其他人
this.broadcastToRoom(roomName, {
type: 'user_left',
user: client.user,
timestamp: Date.now()
});
// 如果房间为空,删除房间
if (room.size === 0) {
this.rooms.delete(roomName);
console.log(`🗑️ 房间 ${roomName} 已删除(无用户)`);
}
}
client.room = null;
}
handleClientDisconnect(clientId) {
this.handleLeaveRoom(clientId);
this.clients.delete(clientId);
console.log(`📊 当前连接数: ${this.clients.size}`);
}
sendToClient(clientId, data) {
const client = this.clients.get(clientId);
if (client && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(JSON.stringify(data));
}
}
broadcastToRoom(roomName, data, excludeClientId = null) {
const room = this.rooms.get(roomName);
if (!room) return;
const message = JSON.stringify(data);
room.forEach(clientId => {
if (clientId !== excludeClientId) {
const client = this.clients.get(clientId);
if (client && client.ws.readyState === WebSocket.OPEN) {
client.ws.send(message);
}
}
});
}
startHeartbeat() {
setInterval(() => {
this.clients.forEach((client, clientId) => {
if (client.isAlive === false) {
console.log(`💀 客户端${clientId}心跳超时,断开连接`);
client.ws.terminate();
return;
}
client.isAlive = false;
client.ws.ping();
});
}, 30000);
}
generateClientId() {
return `client_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
start() {
this.server.listen(this.port, () => {
console.log(`🚀 WebSocket服务器运行在端口 ${this.port}`);
});
}
shutdown() {
console.log('🛑 正在关闭服务器...');
// 通知所有客户端
this.clients.forEach((client) => {
client.ws.close(1001, 'Server shutting down');
});
// 关闭服务器
this.wss.close(() => {
this.server.close(() => {
console.log('✅ 服务器已关闭');
});
});
}
}
// 启动服务器
const server = new WebSocketServer({ port: 8080 });
server.start();
// 优雅关闭
process.on('SIGTERM', () => server.shutdown());
process.on('SIGINT', () => server.shutdown());
Python服务器实现(asyncio)
import asyncio
import websockets
import json
import time
from collections import defaultdict
class WebSocketChatServer:
def __init__(self, host='localhost', port=8080):
self.host = host
self.port = port
self.clients = {}
self.rooms = defaultdict(set)
async def register(self, websocket, client_id):
"""注册新客户端"""
self.clients[client_id] = {
'ws': websocket,
'user': None,
'room': None,
'connected_at': time.time()
}
print(f"✅ 新连接: {client_id}")
async def unregister(self, client_id):
"""注销客户端"""
client = self.clients.get(client_id)
if client and client['room']:
await self.handle_leave_room(client_id)
if client_id in self.clients:
del self.clients[client_id]
print(f"🔌 断开: {client_id}")
async def handle_client(self, websocket, path):
"""处理单个客户端连接"""
client_id = f"client_{int(time.time())}_{id(websocket)}"
await self.register(websocket, client_id)
try:
# 发送欢迎消息
await websocket.send(json.dumps({
'type': 'welcome',
'client_id': client_id,
'timestamp': time.time()
}))
# 消息循环
async for message in websocket:
await self.handle_message(client_id, message)
except websockets.exceptions.ConnectionClosed:
print(f"连接关闭: {client_id}")
finally:
await self.unregister(client_id)
async def handle_message(self, client_id, message):
"""处理接收到的消息"""
try:
data = json.loads(message)
msg_type = data.get('type')
if msg_type == 'ping':
await self.send_to_client(client_id, {
'type': 'pong',
'timestamp': time.time()
})
elif msg_type == 'join':
await self.handle_join_room(
client_id,
data.get('room'),
data.get('user')
)
elif msg_type == 'message':
await self.handle_chat_message(client_id, data)
elif msg_type == 'leave':
await self.handle_leave_room(client_id)
except json.JSONDecodeError:
print(f"无效的JSON消息从 {client_id}")
except Exception as e:
print(f"处理消息错误: {e}")
async def handle_join_room(self, client_id, room_name, user_name):
"""处理加入房间"""
client = self.clients.get(client_id)
if not client:
return
# 离开当前房间
if client['room']:
await self.handle_leave_room(client_id)
# 加入新房间
self.rooms[room_name].add(client_id)
client['room'] = room_name
client['user'] = user_name
print(f"👤 {user_name} 加入房间 {room_name}")
# 通知房间其他成员
await self.broadcast_to_room(room_name, {
'type': 'user_joined',
'user': user_name,
'timestamp': time.time()
}, exclude=client_id)
# 发送房间信息
room_users = [
self.clients[cid]['user']
for cid in self.rooms[room_name]
if self.clients[cid]['user']
]
await self.send_to_client(client_id, {
'type': 'room_joined',
'room': room_name,
'users': room_users
})
async def handle_chat_message(self, client_id, data):
"""处理聊天消息"""
client = self.clients.get(client_id)
if not client or not client['room']:
return
await self.broadcast_to_room(client['room'], {
'type': 'message',
'from': client['user'],
'content': data.get('content'),
'timestamp': time.time()
})
async def handle_leave_room(self, client_id):
"""处理离开房间"""
client = self.clients.get(client_id)
if not client or not client['room']:
return
room_name = client['room']
# 从房间移除
self.rooms[room_name].discard(client_id)
# 通知其他成员
await self.broadcast_to_room(room_name, {
'type': 'user_left',
'user': client['user'],
'timestamp': time.time()
})
# 清空房间信息
client['room'] = None
# 删除空房间
if len(self.rooms[room_name]) == 0:
del self.rooms[room_name]
print(f"🗑️ 房间 {room_name} 已删除")
async def send_to_client(self, client_id, data):
"""发送消息给单个客户端"""
client = self.clients.get(client_id)
if client:
await client['ws'].send(json.dumps(data))
async def broadcast_to_room(self, room_name, data, exclude=None):
"""广播消息到房间"""
room = self.rooms.get(room_name)
if not room:
return
message = json.dumps(data)
tasks = []
for client_id in room:
if client_id != exclude:
client = self.clients.get(client_id)
if client:
tasks.append(client['ws'].send(message))
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
async def start(self):
"""启动服务器"""
async with websockets.serve(
self.handle_client,
self.host,
self.port,
ping_interval=30,
ping_timeout=10
):
print(f"🚀 WebSocket服务器运行在 ws://{self.host}:{self.port}")
await asyncio.Future() # 永久运行
# 启动
if __name__ == '__main__':
server = WebSocketChatServer()
asyncio.run(server.start())
性能优化技巧
1. 消息批处理
class BatchedWebSocket {
constructor(url) {
this.ws = new WebSocket(url);
this.messageQueue = [];
this.batchSize = 10;
this.batchTimeout = 100; // ms
this.timer = null;
}
send(data) {
this.messageQueue.push(data);
if (this.messageQueue.length >= this.batchSize) {
this.flush();
} else if (!this.timer) {
this.timer = setTimeout(() => this.flush(), this.batchTimeout);
}
}
flush() {
if (this.messageQueue.length === 0) return;
this.ws.send(JSON.stringify({
type: 'batch',
messages: this.messageQueue
}));
this.messageQueue = [];
clearTimeout(this.timer);
this.timer = null;
}
}
2. 消息压缩
// 使用permessage-deflate扩展
const ws = new WebSocket('wss://example.com', {
perMessageDeflate: {
zlibDeflateOptions: {
chunkSize: 1024,
memLevel: 7,
level: 3
},
threshold: 1024 // 只压缩>1KB的消息
}
});
3. 二进制帧优化
// 发送二进制数据(比JSON更高效)
const data = new Uint8Array([1, 2, 3, 4, 5]);
ws.send(data.buffer);
// 接收二进制数据
ws.binaryType = 'arraybuffer';
ws.onmessage = (event) => {
if (event.data instanceof ArrayBuffer) {
const view = new Uint8Array(event.data);
// 处理二进制数据
}
};
监控和指标
连接统计
class WebSocketMonitor {
constructor(wss) {
this.wss = wss;
this.metrics = {
totalConnections: 0,
activeConnections: 0,
messagesReceived: 0,
messagesSent: 0,
bytesReceived: 0,
bytesSent: 0,
errors: 0
};
this.startMonitoring();
}
startMonitoring() {
setInterval(() => {
this.metrics.activeConnections = this.wss.clients.size;
this.report();
}, 60000); // 每分钟报告
}
report() {
console.log('📊 WebSocket统计:');
console.log(` 活跃连接: ${this.metrics.activeConnections}`);
console.log(` 总连接数: ${this.metrics.totalConnections}`);
console.log(` 消息接收: ${this.metrics.messagesReceived}`);
console.log(` 消息发送: ${this.metrics.messagesSent}`);
console.log(` 接收字节: ${(this.metrics.bytesReceived / 1024).toFixed(2)} KB`);
console.log(` 发送字节: ${(this.metrics.bytesSent / 1024).toFixed(2)} KB`);
console.log(` 错误次数: ${this.metrics.errors}`);
}
}
总结
WebSocket协议是Web实时通信的基石:
- 信: 准确实现双向通信
- 达: 清晰的协议规范
- 雅: 优雅的设计和实现
核心价值:
- 🔄 真正的全双工通信
- ⚡ 低延迟实时数据传输
- 📦 轻量级帧协议
- 🔐 内置安全机制
最佳使用场景:
- 实时聊天
- 协作编辑
- 实时游戏
- 实时数据流
- 推送通知
相关RFC:
- RFC 6455: WebSocket协议(本文档)
- RFC 7692: WebSocket压缩扩展
- RFC 8441: HTTP/2上的WebSocket引导