7.1 Subscription Procedure 原理
Subscription 与 Query/Mutation 的本质区别:Query 和 Mutation 是"请求-响应"模型(一次请求,一次响应),而 Subscription 是"持久连接"模型(一次订阅,服务端可多次推送)。
tRPC 的 Subscription 基于 Observable 模式(来自 RxJS 的概念,但 tRPC 自带轻量实现)。服务端通过 emit.next() 推送数据,客户端通过 onData 回调接收。
// 安装 WebSocket 依赖
pnpm add ws
pnpm add -D @types/ws
SHELL
7.2 服务端:WebSocket 适配器
tRPC 提供专用的 WebSocket 服务器适配器。通常在 Next.js 项目中,需要一个独立的 WebSocket 服务器进程(因为 Next.js 的 serverless 模式不支持持久连接):
// server/ws-server.ts — 独立 WebSocket 服务器
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import { WebSocketServer } from 'ws';
import { appRouter } from './router';
import { createContext } from './context';
const wss = new WebSocketServer({ port: 3001 });
const handler = applyWSSHandler({
wss,
router: appRouter,
createContext,
// 保持连接心跳(避免超时断开)
keepAlive: {
enabled: true,
pingMs: 30000, // 每30秒发一次 ping
pongWaitMs: 5000, // 5秒内没收到 pong 则断开
},
});
wss.on('connection', (ws) => {
console.log('Client connected, total:', wss.clients.size);
ws.once('close', () => {
console.log('Client disconnected, total:', wss.clients.size);
});
});
process.on('SIGTERM', () => {
handler.broadcastReconnectNotification();
wss.close();
});
console.log('WebSocket server started on ws://localhost:3001');
TS
7.3 定义 Subscription Procedure
// server/routers/notification.ts
import { observable } from '@trpc/server/observable';
import { EventEmitter } from 'events';
import { z } from 'zod';
import { router, protectedProcedure } from '../trpc';
// 全局事件总线(实际生产中用 Redis Pub/Sub)
const ee = new EventEmitter();
ee.setMaxListeners(100); // 支持更多并发订阅者
export const notificationRouter = router({
// 订阅当前用户的通知
onNew: protectedProcedure
.subscription(({ ctx }) => {
return observable<{
id: string;
type: 'like' | 'comment' | 'follow';
message: string;
createdAt: Date;
}>((emit) => {
const userId = ctx.user.id;
const channel = `notification:${userId}`;
const onNotification = (data: Notification) => {
emit.next(data);
};
ee.on(channel, onNotification);
// 清理函数:客户端断开时执行
return () => ee.off(channel, onNotification);
});
}),
// 发送通知(触发订阅推送)
send: protectedProcedure
.input(z.object({
toUserId: z.string(),
type: z.enum(['like', 'comment', 'follow']),
message: z.string(),
}))
.mutation(async ({ input }) => {
const notification = {
id: crypto.randomUUID(),
...input,
createdAt: new Date(),
};
// 触发订阅推送
ee.emit(`notification:${input.toUserId}`, notification);
return notification;
}),
});
TS
7.4 客户端:WebSocket Link
// lib/trpc.ts — 客户端 WebSocket 配置
import { createWSClient, wsLink, splitLink, httpBatchLink } from '@trpc/client';
// 创建 WebSocket 客户端(懒初始化,首次使用时连接)
let wsClient: ReturnType<typeof createWSClient> | null = null;
function getWSClient() {
if (!wsClient) {
wsClient = createWSClient({
url: 'ws://localhost:3001',
onOpen() { console.log('WebSocket connected'); },
onClose() { console.log('WebSocket disconnected'); },
});
}
return wsClient;
}
const trpcClient = trpc.createClient({
links: [
// splitLink:Subscription 走 WebSocket,其他走 HTTP
splitLink({
condition: (op) => op.type === 'subscription',
true: wsLink({ client: getWSClient() }),
false: httpBatchLink({ url: '/api/trpc' }),
}),
],
});
TS
7.5 客户端:useSubscription Hook
'use client';
import { useState } from 'react';
import { trpc } from '@/lib/trpc';
export function NotificationBell() {
const [notifications, setNotifications] = useState<Notification[]>([]);
// 订阅实时通知
trpc.notification.onNew.useSubscription(undefined, {
onData(notification) {
// 每次服务端推送都调用
setNotifications(prev => [notification, ...prev]);
},
onError(err) {
console.error('Subscription error:', err);
},
// 断线重连配置
enabled: true,
});
return (
<div>
<span>通知 ({notifications.length})</span>
<ul>
{notifications.map(n => (
<li key={n.id}>{n.message}</li>
))}
</ul>
</div>
);
}
TSX
7.6 SSE(Server-Sent Events)替代方案
如果不想运行独立 WebSocket 服务器,可以使用 SSE(Server-Sent Events)。SSE 是单向的服务器推送,基于普通 HTTP,无需额外端口,与 Next.js Edge Runtime 完全兼容:
// app/api/notifications/route.ts — SSE 端点
import { auth } from '@/lib/auth';
export async function GET(req: Request) {
const session = await auth();
if (!session) return new Response('Unauthorized', { status: 401 });
const stream = new TransformStream();
const writer = stream.writable.getWriter();
const encoder = new TextEncoder();
const send = (data: object) => {
writer.write(encoder.encode(`data: ${JSON.stringify(data)}\n\n`));
};
// 订阅事件总线
const channel = `notification:${session.user.id}`;
ee.on(channel, send);
req.signal.addEventListener('abort', () => ee.off(channel, send));
return new Response(stream.readable, {
headers: {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
},
});
}
TS
生产环境注意单机内存 EventEmitter 无法在多实例部署(如 Vercel、多节点 K8s)中工作。生产环境应使用 Redis Pub/Sub(ioredis)替代 EventEmitter,确保跨实例的消息传递。
本章小结Subscription Procedure 基于 observable() 实现服务端主动推送。需要配合 WebSocket 服务器(applyWSSHandler)使用,客户端通过 splitLink 将订阅请求路由到 WebSocket,普通请求走 HTTP。SSE 是不需要双向通信时的轻量替代方案,兼容 Serverless 环境。生产环境的多节点推送需要 Redis Pub/Sub 作为消息总线。