Appearance
SSE / WebSocket 实时通信
概述
实时通信是现代 Web 应用的重要功能,SSE 和 WebSocket 是两种主流的服务器推送技术。
SSE (Server-Sent Events)
1. 基本用法
javascript
// 创建 SSE 连接
const eventSource = new EventSource('/api/events')
// 监听消息
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data)
console.log('收到消息:', data)
}
// 监听自定义事件
eventSource.addEventListener('custom-event', (event) => {
const data = JSON.parse(event.data)
console.log('自定义事件:', data)
})
// 监听连接打开
eventSource.onopen = () => {
console.log('SSE 连接已建立')
}
// 监听错误
eventSource.onerror = (error) => {
console.error('SSE 错误:', error)
}
// 关闭连接
eventSource.close()2. 封装 SSE 客户端
javascript
class SSEClient {
constructor(url, options = {}) {
this.url = url
this.options = options
this.eventSource = null
this.reconnectAttempts = 0
this.maxReconnectAttempts = options.maxReconnectAttempts || 5
this.listeners = new Map()
}
connect() {
this.eventSource = new EventSource(this.url)
this.eventSource.onopen = () => {
console.log('SSE 连接成功')
this.reconnectAttempts = 0
this.emit('open')
}
this.eventSource.onmessage = (event) => {
try {
const data = JSON.parse(event.data)
this.emit('message', data)
} catch (error) {
console.error('解析消息失败:', error)
}
}
this.eventSource.onerror = (error) => {
console.error('SSE 错误:', error)
this.emit('error', error)
this.reconnect()
}
}
reconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++
const delay = Math.min(1000 * this.reconnectAttempts, 10000)
setTimeout(() => {
console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`)
this.connect()
}, delay)
}
}
on(event, callback) {
if (!this.listeners.has(event)) {
this.listeners.set(event, [])
}
this.listeners.get(event).push(callback)
}
emit(event, data) {
const callbacks = this.listeners.get(event)
if (callbacks) {
callbacks.forEach(callback => callback(data))
}
}
close() {
if (this.eventSource) {
this.eventSource.close()
this.eventSource = null
}
}
}
// 使用示例
const sse = new SSEClient('/api/events', {
maxReconnectAttempts: 10
})
sse.on('message', (data) => {
console.log('收到消息:', data)
})
sse.connect()3. 服务端实现 (Node.js)
javascript
const express = require('express')
const app = express()
app.get('/api/events', (req, res) => {
res.setHeader('Content-Type', 'text/event-stream')
res.setHeader('Cache-Control', 'no-cache')
res.setHeader('Connection', 'keep-alive')
// 发送消息
const sendEvent = (data) => {
res.write(`data: ${JSON.stringify(data)}\n\n`)
}
// 定期发送消息
const interval = setInterval(() => {
sendEvent({
time: new Date().toISOString(),
message: 'Hello from server'
})
}, 1000)
// 客户端断开连接时清理
req.on('close', () => {
clearInterval(interval)
res.end()
})
})WebSocket
1. 基本用法
javascript
// 创建 WebSocket 连接
const ws = new WebSocket('ws://localhost:8080')
// 监听连接打开
ws.onopen = () => {
console.log('WebSocket 连接已建立')
// 发送消息
ws.send('Hello Server!')
}
// 监听消息
ws.onmessage = (event) => {
console.log('收到消息:', event.data)
}
// 监听错误
ws.onerror = (error) => {
console.error('WebSocket 错误:', error)
}
// 监听连接关闭
ws.onclose = () => {
console.log('WebSocket 连接已关闭')
}2. 封装 WebSocket 客户端
javascript
class WebSocketClient {
constructor(url, options = {}) {
this.url = url
this.options = options
this.ws = null
this.reconnectAttempts = 0
this.maxReconnectAttempts = options.maxReconnectAttempts || 5
this.heartbeatInterval = options.heartbeatInterval || 30000
this.heartbeatTimer = null
this.listeners = new Map()
this.messageQueue = []
}
connect() {
this.ws = new WebSocket(this.url)
this.ws.onopen = () => {
console.log('WebSocket 连接成功')
this.reconnectAttempts = 0
this.startHeartbeat()
this.flushMessageQueue()
this.emit('open')
}
this.ws.onmessage = (event) => {
try {
const data = JSON.parse(event.data)
// 处理心跳响应
if (data.type === 'pong') {
return
}
this.emit('message', data)
} catch (error) {
console.error('解析消息失败:', error)
}
}
this.ws.onerror = (error) => {
console.error('WebSocket 错误:', error)
this.emit('error', error)
}
this.ws.onclose = () => {
console.log('WebSocket 连接关闭')
this.stopHeartbeat()
this.emit('close')
this.reconnect()
}
}
reconnect() {
if (this.reconnectAttempts < this.maxReconnectAttempts) {
this.reconnectAttempts++
const delay = Math.min(1000 * this.reconnectAttempts, 10000)
setTimeout(() => {
console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`)
this.connect()
}, delay)
}
}
startHeartbeat() {
this.heartbeatTimer = setInterval(() => {
if (this.ws.readyState === WebSocket.OPEN) {
this.send({ type: 'ping' })
}
}, this.heartbeatInterval)
}
stopHeartbeat() {
if (this.heartbeatTimer) {
clearInterval(this.heartbeatTimer)
this.heartbeatTimer = null
}
}
send(data) {
const message = typeof data === 'string' ? data : JSON.stringify(data)
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(message)
} else {
// 连接未打开时,将消息加入队列
this.messageQueue.push(message)
}
}
flushMessageQueue() {
while (this.messageQueue.length > 0) {
const message = this.messageQueue.shift()
this.ws.send(message)
}
}
on(event, callback) {
if (!this.listeners.has(event)) {
this.listeners.set(event, [])
}
this.listeners.get(event).push(callback)
}
emit(event, data) {
const callbacks = this.listeners.get(event)
if (callbacks) {
callbacks.forEach(callback => callback(data))
}
}
close() {
this.stopHeartbeat()
if (this.ws) {
this.ws.close()
this.ws = null
}
}
}
// 使用示例
const wsClient = new WebSocketClient('ws://localhost:8080', {
maxReconnectAttempts: 10,
heartbeatInterval: 30000
})
wsClient.on('message', (data) => {
console.log('收到消息:', data)
})
wsClient.connect()3. 服务端实现 (Node.js)
javascript
const WebSocket = require('ws')
const wss = new WebSocket.Server({ port: 8080 })
const clients = new Set()
wss.on('connection', (ws) => {
clients.add(ws)
console.log('客户端已连接,当前连接数:', clients.size)
ws.on('message', (message) => {
try {
const data = JSON.parse(message)
// 处理心跳
if (data.type === 'ping') {
ws.send(JSON.stringify({ type: 'pong' }))
return
}
// 广播消息给所有客户端
broadcast(data)
} catch (error) {
console.error('解析消息失败:', error)
}
})
ws.on('close', () => {
clients.delete(ws)
console.log('客户端已断开,当前连接数:', clients.size)
})
ws.on('error', (error) => {
console.error('WebSocket 错误:', error)
})
})
function broadcast(data) {
const message = JSON.stringify(data)
clients.forEach(client => {
if (client.readyState === WebSocket.OPEN) {
client.send(message)
}
})
}
console.log('WebSocket 服务器已启动: ws://localhost:8080')SSE vs WebSocket
| 特性 | SSE | WebSocket |
|---|---|---|
| 通信方式 | 单向(服务器→客户端) | 双向 |
| 协议 | HTTP | WebSocket |
| 断线重连 | 自动 | 需手动实现 |
| 数据格式 | 文本 | 文本/二进制 |
| 浏览器支持 | 较好 | 较好 |
| 适用场景 | 实时推送、通知 | 聊天、游戏 |
最佳实践
1. 连接管理
- 实现断线重连
- 心跳检测
- 连接状态监控
2. 性能优化
- 消息压缩
- 批量发送
- 连接池管理
3. 安全性
- 使用 WSS (WebSocket Secure)
- 身份验证
- 消息加密