Skip to content

SSE / WebSocket 实时通信

概述

实时通信是现代 Web 应用的重要功能,SSE 和 WebSocket 是两种主流的服务器推送技术。

SSE (Server-Sent Events)

1. 基本用法

javascript
// 创建 SSE 连接
const eventSource = new EventSource('/api/events')

// 监听消息
eventSource.onmessage = (event) => {
  const data = JSON.parse(event.data)
  console.log('收到消息:', data)
}

// 监听自定义事件
eventSource.addEventListener('custom-event', (event) => {
  const data = JSON.parse(event.data)
  console.log('自定义事件:', data)
})

// 监听连接打开
eventSource.onopen = () => {
  console.log('SSE 连接已建立')
}

// 监听错误
eventSource.onerror = (error) => {
  console.error('SSE 错误:', error)
}

// 关闭连接
eventSource.close()

2. 封装 SSE 客户端

javascript
class SSEClient {
  constructor(url, options = {}) {
    this.url = url
    this.options = options
    this.eventSource = null
    this.reconnectAttempts = 0
    this.maxReconnectAttempts = options.maxReconnectAttempts || 5
    this.listeners = new Map()
  }

  connect() {
    this.eventSource = new EventSource(this.url)

    this.eventSource.onopen = () => {
      console.log('SSE 连接成功')
      this.reconnectAttempts = 0
      this.emit('open')
    }

    this.eventSource.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data)
        this.emit('message', data)
      } catch (error) {
        console.error('解析消息失败:', error)
      }
    }

    this.eventSource.onerror = (error) => {
      console.error('SSE 错误:', error)
      this.emit('error', error)
      this.reconnect()
    }
  }

  reconnect() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++
      const delay = Math.min(1000 * this.reconnectAttempts, 10000)
      
      setTimeout(() => {
        console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`)
        this.connect()
      }, delay)
    }
  }

  on(event, callback) {
    if (!this.listeners.has(event)) {
      this.listeners.set(event, [])
    }
    this.listeners.get(event).push(callback)
  }

  emit(event, data) {
    const callbacks = this.listeners.get(event)
    if (callbacks) {
      callbacks.forEach(callback => callback(data))
    }
  }

  close() {
    if (this.eventSource) {
      this.eventSource.close()
      this.eventSource = null
    }
  }
}

// 使用示例
const sse = new SSEClient('/api/events', {
  maxReconnectAttempts: 10
})

sse.on('message', (data) => {
  console.log('收到消息:', data)
})

sse.connect()

3. 服务端实现 (Node.js)

javascript
const express = require('express')
const app = express()

app.get('/api/events', (req, res) => {
  res.setHeader('Content-Type', 'text/event-stream')
  res.setHeader('Cache-Control', 'no-cache')
  res.setHeader('Connection', 'keep-alive')

  // 发送消息
  const sendEvent = (data) => {
    res.write(`data: ${JSON.stringify(data)}\n\n`)
  }

  // 定期发送消息
  const interval = setInterval(() => {
    sendEvent({
      time: new Date().toISOString(),
      message: 'Hello from server'
    })
  }, 1000)

  // 客户端断开连接时清理
  req.on('close', () => {
    clearInterval(interval)
    res.end()
  })
})

WebSocket

1. 基本用法

javascript
// 创建 WebSocket 连接
const ws = new WebSocket('ws://localhost:8080')

// 监听连接打开
ws.onopen = () => {
  console.log('WebSocket 连接已建立')
  
  // 发送消息
  ws.send('Hello Server!')
}

// 监听消息
ws.onmessage = (event) => {
  console.log('收到消息:', event.data)
}

// 监听错误
ws.onerror = (error) => {
  console.error('WebSocket 错误:', error)
}

// 监听连接关闭
ws.onclose = () => {
  console.log('WebSocket 连接已关闭')
}

2. 封装 WebSocket 客户端

javascript
class WebSocketClient {
  constructor(url, options = {}) {
    this.url = url
    this.options = options
    this.ws = null
    this.reconnectAttempts = 0
    this.maxReconnectAttempts = options.maxReconnectAttempts || 5
    this.heartbeatInterval = options.heartbeatInterval || 30000
    this.heartbeatTimer = null
    this.listeners = new Map()
    this.messageQueue = []
  }

  connect() {
    this.ws = new WebSocket(this.url)

    this.ws.onopen = () => {
      console.log('WebSocket 连接成功')
      this.reconnectAttempts = 0
      this.startHeartbeat()
      this.flushMessageQueue()
      this.emit('open')
    }

    this.ws.onmessage = (event) => {
      try {
        const data = JSON.parse(event.data)
        
        // 处理心跳响应
        if (data.type === 'pong') {
          return
        }
        
        this.emit('message', data)
      } catch (error) {
        console.error('解析消息失败:', error)
      }
    }

    this.ws.onerror = (error) => {
      console.error('WebSocket 错误:', error)
      this.emit('error', error)
    }

    this.ws.onclose = () => {
      console.log('WebSocket 连接关闭')
      this.stopHeartbeat()
      this.emit('close')
      this.reconnect()
    }
  }

  reconnect() {
    if (this.reconnectAttempts < this.maxReconnectAttempts) {
      this.reconnectAttempts++
      const delay = Math.min(1000 * this.reconnectAttempts, 10000)
      
      setTimeout(() => {
        console.log(`尝试重连 (${this.reconnectAttempts}/${this.maxReconnectAttempts})`)
        this.connect()
      }, delay)
    }
  }

  startHeartbeat() {
    this.heartbeatTimer = setInterval(() => {
      if (this.ws.readyState === WebSocket.OPEN) {
        this.send({ type: 'ping' })
      }
    }, this.heartbeatInterval)
  }

  stopHeartbeat() {
    if (this.heartbeatTimer) {
      clearInterval(this.heartbeatTimer)
      this.heartbeatTimer = null
    }
  }

  send(data) {
    const message = typeof data === 'string' ? data : JSON.stringify(data)
    
    if (this.ws.readyState === WebSocket.OPEN) {
      this.ws.send(message)
    } else {
      // 连接未打开时,将消息加入队列
      this.messageQueue.push(message)
    }
  }

  flushMessageQueue() {
    while (this.messageQueue.length > 0) {
      const message = this.messageQueue.shift()
      this.ws.send(message)
    }
  }

  on(event, callback) {
    if (!this.listeners.has(event)) {
      this.listeners.set(event, [])
    }
    this.listeners.get(event).push(callback)
  }

  emit(event, data) {
    const callbacks = this.listeners.get(event)
    if (callbacks) {
      callbacks.forEach(callback => callback(data))
    }
  }

  close() {
    this.stopHeartbeat()
    if (this.ws) {
      this.ws.close()
      this.ws = null
    }
  }
}

// 使用示例
const wsClient = new WebSocketClient('ws://localhost:8080', {
  maxReconnectAttempts: 10,
  heartbeatInterval: 30000
})

wsClient.on('message', (data) => {
  console.log('收到消息:', data)
})

wsClient.connect()

3. 服务端实现 (Node.js)

javascript
const WebSocket = require('ws')

const wss = new WebSocket.Server({ port: 8080 })

const clients = new Set()

wss.on('connection', (ws) => {
  clients.add(ws)
  
  console.log('客户端已连接,当前连接数:', clients.size)

  ws.on('message', (message) => {
    try {
      const data = JSON.parse(message)
      
      // 处理心跳
      if (data.type === 'ping') {
        ws.send(JSON.stringify({ type: 'pong' }))
        return
      }
      
      // 广播消息给所有客户端
      broadcast(data)
    } catch (error) {
      console.error('解析消息失败:', error)
    }
  })

  ws.on('close', () => {
    clients.delete(ws)
    console.log('客户端已断开,当前连接数:', clients.size)
  })

  ws.on('error', (error) => {
    console.error('WebSocket 错误:', error)
  })
})

function broadcast(data) {
  const message = JSON.stringify(data)
  clients.forEach(client => {
    if (client.readyState === WebSocket.OPEN) {
      client.send(message)
    }
  })
}

console.log('WebSocket 服务器已启动: ws://localhost:8080')

SSE vs WebSocket

特性SSEWebSocket
通信方式单向(服务器→客户端)双向
协议HTTPWebSocket
断线重连自动需手动实现
数据格式文本文本/二进制
浏览器支持较好较好
适用场景实时推送、通知聊天、游戏

最佳实践

1. 连接管理

  • 实现断线重连
  • 心跳检测
  • 连接状态监控

2. 性能优化

  • 消息压缩
  • 批量发送
  • 连接池管理

3. 安全性

  • 使用 WSS (WebSocket Secure)
  • 身份验证
  • 消息加密

相关资源

基于 VitePress 的本地知识库