Server-Sent Events(SSE)
什么是 SSE
SSE(Server-Sent Events)是 HTML5 的原生 API,允许服务器通过 HTTP 连接向客户端持续推送数据。与 WebSocket 相比:
| 特性 | SSE | WebSocket |
|---|---|---|
| 方向 | 单向(服务器→客户端) | 双向 |
| 协议 | HTTP(可用现有基础设施) | ws:// 协议升级 |
| 自动重连 | 浏览器原生支持 | 需手动实现 |
| 适用场景 | 通知、实时数据、进度更新 | 聊天、游戏、协同编辑 |
| 防火墙穿透 | 好(标准 HTTP) | 可能被拦截 |
htmx SSE 扩展
在 htmx 2.0 中,SSE 功能通过扩展插件提供(不在核心包中):
<!-- 引入 htmx 核心 + SSE 扩展 -->
<script src="https://unpkg.com/htmx.org@2.0.0"></script>
<script src="https://unpkg.com/htmx-ext-sse@2.0.0"></script>
<!-- 连接 SSE 端点,监听 message 事件,更新目标元素 -->
<div
hx-ext="sse"
sse-connect="/api/notifications/stream"
sse-swap="message"
hx-target="#notifications"
hx-swap="beforeend"
>
<div id="notifications"></div>
</div>
<!-- 监听自定义命名事件 -->
<div
hx-ext="sse"
sse-connect="/api/live-feed"
>
<!-- 监听名为 "newPost" 的 SSE 事件 -->
<div
sse-swap="newPost"
hx-target="#post-list"
hx-swap="afterbegin"
></div>
<!-- 监听名为 "statsUpdate" 的 SSE 事件 -->
<div
sse-swap="statsUpdate"
hx-target="#dashboard-stats"
hx-swap="innerHTML"
></div>
<div id="post-list"></div>
<div id="dashboard-stats"></div>
</div>
服务端 SSE 实现(FastAPI)
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
import asyncio
import json
@app.get("/api/notifications/stream")
async def notification_stream(request: Request):
async def event_generator():
while True:
# 检查客户端是否已断开
if await request.is_disconnected():
break
notification = await get_pending_notification()
if notification:
# SSE 格式:data: HTML内容\n\n
html = f"""<div class="notif">
<strong>{notification.title}</strong>
<p>{notification.body}</p>
</div>"""
yield f"data: {html}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
event_generator(),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"X-Accel-Buffering": "no", # Nginx 禁用缓冲
}
)
@app.get("/api/live-feed")
async def live_feed():
async def gen():
async for event in subscribe_to_events():
if event.type == "post":
# 命名事件:event: newPost
yield f"event: newPost\ndata: {render_post(event.data)}\n\n"
elif event.type == "stats":
yield f"event: statsUpdate\ndata: {render_stats(event.data)}\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
实战:任务进度条
<!-- 前端:启动任务并显示进度 -->
<button hx-post="/api/jobs" hx-target="#job-status">
开始处理
</button>
<div id="job-status"></div>
<!-- 服务器返回的进度监控片段 -->
<div hx-ext="sse" sse-connect="/api/jobs/42/progress">
<div sse-swap="progress" hx-target="#progress-bar"></div>
<div sse-swap="complete" hx-target="#job-status" hx-swap="outerHTML"></div>
<div id="progress-bar">
<div class="bar" style="width: 0%">0%</div>
</div>
</div>
@app.get("/api/jobs/{job_id}/progress")
async def job_progress(job_id: int):
async def gen():
while True:
progress = await get_job_progress(job_id)
bar_html = f'<div class="bar" style="width:{progress}%">{progress}%</div>'
yield f"event: progress\ndata: {bar_html}\n\n"
if progress >= 100:
result_html = '<p class="success">✓ 处理完成!<a href="/results">查看结果</a></p>'
yield f"event: complete\ndata: {result_html}\n\n"
break
await asyncio.sleep(0.5)
return StreamingResponse(gen(), media_type="text/event-stream")
WebSocket
htmx WebSocket 扩展
<!-- 引入 WebSocket 扩展 -->
<script src="https://unpkg.com/htmx-ext-ws@2.0.0"></script>
<!-- 聊天室示例 -->
<div
hx-ext="ws"
ws-connect="/ws/chat/room-1"
>
<!-- 消息列表:服务器推送的 HTML 片段在这里追加 -->
<div
id="chat-messages"
hx-swap-oob="beforeend"
></div>
<!-- 发送消息表单 -->
<form ws-send>
<!-- ws-send 属性使表单通过 WebSocket 发送,而非 HTTP -->
<input name="message" placeholder="输入消息..." autocomplete="off"/>
<button type="submit">发送</button>
</form>
</div>
# FastAPI WebSocket 聊天服务器
from fastapi import WebSocket, WebSocketDisconnect
from typing import List
class ChatRoom:
def __init__(self):
self.connections: List[WebSocket] = []
async def connect(self, ws: WebSocket):
await ws.accept()
self.connections.append(ws)
def disconnect(self, ws: WebSocket):
self.connections.remove(ws)
async def broadcast(self, html: str):
for conn in self.connections:
await conn.send_text(html)
room = ChatRoom()
@app.websocket("/ws/chat/{room_id}")
async def chat(websocket: WebSocket, room_id: str):
await room.connect(websocket)
try:
while True:
data = await websocket.receive_json()
msg = data.get("message", "")
user = data.get("HEADERS", {}).get("HX-Trigger-Name", "匿名")
# 广播给所有连接的客户端(HTML 片段)
html = f"""<div id="chat-messages" hx-swap-oob="beforeend">
<div class="msg">
<strong>{user}:</strong> {msg}
</div>
</div>"""
await room.broadcast(html)
except WebSocketDisconnect:
room.disconnect(websocket)
htmx 通过 WebSocket 发送表单数据时,同时会发送一个 HEADERS 字段,包含常规 htmx 请求头信息(HX-Request、HX-Trigger、HX-Current-URL 等),服务端可以用来识别请求来源和上下文。
Nginx 默认会缓冲后端响应,这会破坏 SSE 的实时性——数据会堆积在 Nginx 缓冲区而不是立即传给浏览器。SSE 端点的 FastAPI/Go 响应头中必须设置 X-Accel-Buffering: no,或在 Nginx 配置中针对该 location 添加 proxy_buffering off;。同理,如果用了 CDN,也需要确保 CDN 对 SSE 端点不做缓冲。
选择的关键问题:客户端是否需要向服务器发送数据?
- 只需服务器推送(通知、日志、进度条、实时数据看板)→ 选 SSE。HTTP 协议,自动重连,防火墙友好,实现更简单。
- 双向通信(聊天室、在线协作、游戏、实时投票)→ 选 WebSocket。真正全双工,延迟更低,但需要手动处理重连和心跳。
- AI 流式输出(ChatGPT 风格的打字机效果)→ 选 SSE。服务器一边生成一边推送,浏览器收一段显示一段。
SSE 与 WebSocket 的核心要点:① SSE 是单向(服务器→客户端)、基于 HTTP 的实时推送,浏览器原生支持自动重连,适合通知、日志流、进度更新等场景;② WebSocket 是双向全双工通信,适合聊天、协同编辑、游戏等需要客户端主动发送数据的场景;③ htmx 2.0 将 SSE 和 WebSocket 分拆为独立扩展(htmx-ext-sse、htmx-ext-ws),需单独引入;④ SSE 扩展中 sse-connect 建立连接,sse-swap 指定监听的事件名,多个子元素可监听同一连接上的不同事件;⑤ 服务器发送 SSE 时数据格式为 data: HTML内容\n\n,命名事件格式为 event: 事件名\ndata: 内容\n\n;⑥ Nginx 代理 SSE 时必须禁用缓冲(proxy_buffering off 或 X-Accel-Buffering: no)。