Chapter 07

订阅与实时通信

基于 WebSocket 的 Subscription Procedure,构建实时推送应用

7.1 Subscription Procedure 原理

Subscription 与 Query/Mutation 的本质区别:Query 和 Mutation 是"请求-响应"模型(一次请求,一次响应),而 Subscription 是"持久连接"模型(一次订阅,服务端可多次推送)。

tRPC 的 Subscription 基于 Observable 模式(来自 RxJS 的概念,但 tRPC 自带轻量实现)。服务端通过 emit.next() 推送数据,客户端通过 onData 回调接收。

// 安装 WebSocket 依赖
pnpm add ws
pnpm add -D @types/ws
SHELL

7.2 服务端:WebSocket 适配器

tRPC 提供专用的 WebSocket 服务器适配器。通常在 Next.js 项目中,需要一个独立的 WebSocket 服务器进程(因为 Next.js 的 serverless 模式不支持持久连接):

// server/ws-server.ts — 独立 WebSocket 服务器
import { applyWSSHandler } from '@trpc/server/adapters/ws';
import { WebSocketServer } from 'ws';
import { appRouter } from './router';
import { createContext } from './context';

const wss = new WebSocketServer({ port: 3001 });

const handler = applyWSSHandler({
  wss,
  router: appRouter,
  createContext,
  // 保持连接心跳(避免超时断开)
  keepAlive: {
    enabled: true,
    pingMs: 30000,    // 每30秒发一次 ping
    pongWaitMs: 5000, // 5秒内没收到 pong 则断开
  },
});

wss.on('connection', (ws) => {
  console.log('Client connected, total:', wss.clients.size);
  ws.once('close', () => {
    console.log('Client disconnected, total:', wss.clients.size);
  });
});

process.on('SIGTERM', () => {
  handler.broadcastReconnectNotification();
  wss.close();
});

console.log('WebSocket server started on ws://localhost:3001');
TS

7.3 定义 Subscription Procedure

// server/routers/notification.ts
import { observable } from '@trpc/server/observable';
import { EventEmitter } from 'events';
import { z } from 'zod';
import { router, protectedProcedure } from '../trpc';

// 全局事件总线(实际生产中用 Redis Pub/Sub)
const ee = new EventEmitter();
ee.setMaxListeners(100); // 支持更多并发订阅者

export const notificationRouter = router({
  // 订阅当前用户的通知
  onNew: protectedProcedure
    .subscription(({ ctx }) => {
      return observable<{
        id: string;
        type: 'like' | 'comment' | 'follow';
        message: string;
        createdAt: Date;
      }>((emit) => {
        const userId = ctx.user.id;
        const channel = `notification:${userId}`;

        const onNotification = (data: Notification) => {
          emit.next(data);
        };

        ee.on(channel, onNotification);

        // 清理函数:客户端断开时执行
        return () => ee.off(channel, onNotification);
      });
    }),

  // 发送通知(触发订阅推送)
  send: protectedProcedure
    .input(z.object({
      toUserId: z.string(),
      type: z.enum(['like', 'comment', 'follow']),
      message: z.string(),
    }))
    .mutation(async ({ input }) => {
      const notification = {
        id: crypto.randomUUID(),
        ...input,
        createdAt: new Date(),
      };
      // 触发订阅推送
      ee.emit(`notification:${input.toUserId}`, notification);
      return notification;
    }),
});
TS

7.4 客户端:WebSocket Link

// lib/trpc.ts — 客户端 WebSocket 配置
import { createWSClient, wsLink, splitLink, httpBatchLink } from '@trpc/client';

// 创建 WebSocket 客户端(懒初始化,首次使用时连接)
let wsClient: ReturnType<typeof createWSClient> | null = null;

function getWSClient() {
  if (!wsClient) {
    wsClient = createWSClient({
      url: 'ws://localhost:3001',
      onOpen() { console.log('WebSocket connected'); },
      onClose() { console.log('WebSocket disconnected'); },
    });
  }
  return wsClient;
}

const trpcClient = trpc.createClient({
  links: [
    // splitLink:Subscription 走 WebSocket,其他走 HTTP
    splitLink({
      condition: (op) => op.type === 'subscription',
      true: wsLink({ client: getWSClient() }),
      false: httpBatchLink({ url: '/api/trpc' }),
    }),
  ],
});
TS

7.5 客户端:useSubscription Hook

'use client';
import { useState } from 'react';
import { trpc } from '@/lib/trpc';

export function NotificationBell() {
  const [notifications, setNotifications] = useState<Notification[]>([]);

  // 订阅实时通知
  trpc.notification.onNew.useSubscription(undefined, {
    onData(notification) {
      // 每次服务端推送都调用
      setNotifications(prev => [notification, ...prev]);
    },
    onError(err) {
      console.error('Subscription error:', err);
    },
    // 断线重连配置
    enabled: true,
  });

  return (
    <div>
      <span>通知 ({notifications.length})</span>
      <ul>
        {notifications.map(n => (
          <li key={n.id}>{n.message}</li>
        ))}
      </ul>
    </div>
  );
}
TSX

7.6 SSE(Server-Sent Events)替代方案

如果不想运行独立 WebSocket 服务器,可以使用 SSE(Server-Sent Events)。SSE 是单向的服务器推送,基于普通 HTTP,无需额外端口,与 Next.js Edge Runtime 完全兼容:

// app/api/notifications/route.ts — SSE 端点
import { auth } from '@/lib/auth';

export async function GET(req: Request) {
  const session = await auth();
  if (!session) return new Response('Unauthorized', { status: 401 });

  const stream = new TransformStream();
  const writer = stream.writable.getWriter();
  const encoder = new TextEncoder();

  const send = (data: object) => {
    writer.write(encoder.encode(`data: ${JSON.stringify(data)}\n\n`));
  };

  // 订阅事件总线
  const channel = `notification:${session.user.id}`;
  ee.on(channel, send);
  req.signal.addEventListener('abort', () => ee.off(channel, send));

  return new Response(stream.readable, {
    headers: {
      'Content-Type': 'text/event-stream',
      'Cache-Control': 'no-cache',
      'Connection': 'keep-alive',
    },
  });
}
TS
⚠️

生产环境注意单机内存 EventEmitter 无法在多实例部署(如 Vercel、多节点 K8s)中工作。生产环境应使用 Redis Pub/Sub(ioredis)替代 EventEmitter,确保跨实例的消息传递。

💡

本章小结Subscription Procedure 基于 observable() 实现服务端主动推送。需要配合 WebSocket 服务器(applyWSSHandler)使用,客户端通过 splitLink 将订阅请求路由到 WebSocket,普通请求走 HTTP。SSE 是不需要双向通信时的轻量替代方案,兼容 Serverless 环境。生产环境的多节点推送需要 Redis Pub/Sub 作为消息总线。