Chapter 07

SSE 与 WebSocket 实时更新

用 htmx 扩展插件实现服务端推送与双向通信,无需手写 JavaScript 事件监听代码

Server-Sent Events(SSE)

什么是 SSE

SSE(Server-Sent Events)是 HTML5 的原生 API,允许服务器通过 HTTP 连接向客户端持续推送数据。与 WebSocket 相比:

特性SSEWebSocket
方向单向(服务器→客户端)双向
协议HTTP(可用现有基础设施)ws:// 协议升级
自动重连浏览器原生支持需手动实现
适用场景通知、实时数据、进度更新聊天、游戏、协同编辑
防火墙穿透好(标准 HTTP)可能被拦截

htmx SSE 扩展

在 htmx 2.0 中,SSE 功能通过扩展插件提供(不在核心包中):

<!-- 引入 htmx 核心 + SSE 扩展 -->
<script src="https://unpkg.com/htmx.org@2.0.0"></script>
<script src="https://unpkg.com/htmx-ext-sse@2.0.0"></script>

<!-- 连接 SSE 端点,监听 message 事件,更新目标元素 -->
<div
  hx-ext="sse"
  sse-connect="/api/notifications/stream"
  sse-swap="message"
  hx-target="#notifications"
  hx-swap="beforeend"
>
  <div id="notifications"></div>
</div>

<!-- 监听自定义命名事件 -->
<div
  hx-ext="sse"
  sse-connect="/api/live-feed"
>
  <!-- 监听名为 "newPost" 的 SSE 事件 -->
  <div
    sse-swap="newPost"
    hx-target="#post-list"
    hx-swap="afterbegin"
  ></div>

  <!-- 监听名为 "statsUpdate" 的 SSE 事件 -->
  <div
    sse-swap="statsUpdate"
    hx-target="#dashboard-stats"
    hx-swap="innerHTML"
  ></div>

  <div id="post-list"></div>
  <div id="dashboard-stats"></div>
</div>

服务端 SSE 实现(FastAPI)

from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json

@app.get("/api/notifications/stream")
async def notification_stream(request: Request):
    async def event_generator():
        while True:
            # 检查客户端是否已断开
            if await request.is_disconnected():
                break

            notification = await get_pending_notification()
            if notification:
                # SSE 格式:data: HTML内容\n\n
                html = f"""<div class="notif">
  <strong>{notification.title}</strong>
  <p>{notification.body}</p>
</div>"""
                yield f"data: {html}\n\n"

            await asyncio.sleep(1)

    return StreamingResponse(
        event_generator(),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "X-Accel-Buffering": "no",  # Nginx 禁用缓冲
        }
    )

@app.get("/api/live-feed")
async def live_feed():
    async def gen():
        async for event in subscribe_to_events():
            if event.type == "post":
                # 命名事件:event: newPost
                yield f"event: newPost\ndata: {render_post(event.data)}\n\n"
            elif event.type == "stats":
                yield f"event: statsUpdate\ndata: {render_stats(event.data)}\n\n"

    return StreamingResponse(gen(), media_type="text/event-stream")

实战:任务进度条

<!-- 前端:启动任务并显示进度 -->
<button hx-post="/api/jobs" hx-target="#job-status">
  开始处理
</button>

<div id="job-status"></div>

<!-- 服务器返回的进度监控片段 -->
<div hx-ext="sse" sse-connect="/api/jobs/42/progress">
  <div sse-swap="progress" hx-target="#progress-bar"></div>
  <div sse-swap="complete" hx-target="#job-status" hx-swap="outerHTML"></div>
  <div id="progress-bar">
    <div class="bar" style="width: 0%">0%</div>
  </div>
</div>
@app.get("/api/jobs/{job_id}/progress")
async def job_progress(job_id: int):
    async def gen():
        while True:
            progress = await get_job_progress(job_id)
            bar_html = f'<div class="bar" style="width:{progress}%">{progress}%</div>'
            yield f"event: progress\ndata: {bar_html}\n\n"

            if progress >= 100:
                result_html = '<p class="success">✓ 处理完成!<a href="/results">查看结果</a></p>'
                yield f"event: complete\ndata: {result_html}\n\n"
                break
            await asyncio.sleep(0.5)

    return StreamingResponse(gen(), media_type="text/event-stream")

WebSocket

htmx WebSocket 扩展

<!-- 引入 WebSocket 扩展 -->
<script src="https://unpkg.com/htmx-ext-ws@2.0.0"></script>

<!-- 聊天室示例 -->
<div
  hx-ext="ws"
  ws-connect="/ws/chat/room-1"
>
  <!-- 消息列表:服务器推送的 HTML 片段在这里追加 -->
  <div
    id="chat-messages"
    hx-swap-oob="beforeend"
  ></div>

  <!-- 发送消息表单 -->
  <form ws-send>
    <!-- ws-send 属性使表单通过 WebSocket 发送,而非 HTTP -->
    <input name="message" placeholder="输入消息..." autocomplete="off"/>
    <button type="submit">发送</button>
  </form>
</div>
# FastAPI WebSocket 聊天服务器
from fastapi import WebSocket, WebSocketDisconnect
from typing import List

class ChatRoom:
    def __init__(self):
        self.connections: List[WebSocket] = []

    async def connect(self, ws: WebSocket):
        await ws.accept()
        self.connections.append(ws)

    def disconnect(self, ws: WebSocket):
        self.connections.remove(ws)

    async def broadcast(self, html: str):
        for conn in self.connections:
            await conn.send_text(html)

room = ChatRoom()

@app.websocket("/ws/chat/{room_id}")
async def chat(websocket: WebSocket, room_id: str):
    await room.connect(websocket)
    try:
        while True:
            data = await websocket.receive_json()
            msg = data.get("message", "")
            user = data.get("HEADERS", {}).get("HX-Trigger-Name", "匿名")

            # 广播给所有连接的客户端(HTML 片段)
            html = f"""<div id="chat-messages" hx-swap-oob="beforeend">
  <div class="msg">
    <strong>{user}:</strong> {msg}
  </div>
</div>"""
            await room.broadcast(html)
    except WebSocketDisconnect:
        room.disconnect(websocket)
WebSocket 消息中的 HEADERS

htmx 通过 WebSocket 发送表单数据时,同时会发送一个 HEADERS 字段,包含常规 htmx 请求头信息(HX-RequestHX-TriggerHX-Current-URL 等),服务端可以用来识别请求来源和上下文。

SSE 与 Nginx 的缓冲问题

Nginx 默认会缓冲后端响应,这会破坏 SSE 的实时性——数据会堆积在 Nginx 缓冲区而不是立即传给浏览器。SSE 端点的 FastAPI/Go 响应头中必须设置 X-Accel-Buffering: no,或在 Nginx 配置中针对该 location 添加 proxy_buffering off;。同理,如果用了 CDN,也需要确保 CDN 对 SSE 端点不做缓冲。

SSE vs WebSocket 选型指南

选择的关键问题:客户端是否需要向服务器发送数据?

本章小结

SSE 与 WebSocket 的核心要点:① SSE 是单向(服务器→客户端)、基于 HTTP 的实时推送,浏览器原生支持自动重连,适合通知、日志流、进度更新等场景;② WebSocket 是双向全双工通信,适合聊天、协同编辑、游戏等需要客户端主动发送数据的场景;③ htmx 2.0 将 SSE 和 WebSocket 分拆为独立扩展(htmx-ext-ssehtmx-ext-ws),需单独引入;④ SSE 扩展中 sse-connect 建立连接,sse-swap 指定监听的事件名,多个子元素可监听同一连接上的不同事件;⑤ 服务器发送 SSE 时数据格式为 data: HTML内容\n\n,命名事件格式为 event: 事件名\ndata: 内容\n\n;⑥ Nginx 代理 SSE 时必须禁用缓冲(proxy_buffering offX-Accel-Buffering: no)。