Chapter 05

Subscription:实时数据推送

通过 WebSocket 实现服务端主动推送,打造实时应用

Subscription 的工作机制

GraphQL Subscription 基于 WebSocket 长连接实现。与 Query/Mutation 的请求-响应模式不同,Subscription 建立持久连接后,服务端在数据变化时主动推送给客户端。

Client Server | | |-- WebSocket Upgrade -----> | |<--- 101 Switching Prots -- | | | |-- connection_init -------> | |<-- connection_ack -------- | | | |-- subscribe (msg_id=1) --> | 订阅开始 | | |<-- next (data: {...}) ---- | 新消息推送 |<-- next (data: {...}) ---- | 新消息推送 | | |-- complete (msg_id=1) ---> | 取消订阅 |-- connection_close ------> |

Schema 定义

type Message {
  id: ID!
  content: String!
  sender: User!
  room: Room!
  createdAt: DateTime!
}

type Subscription {
  # 订阅特定房间的新消息
  messageSent(roomId: ID!): Message!

  # 订阅用户状态变化
  userOnlineStatus(userId: ID!): UserStatus!

  # 订阅所有新通知(无参数)
  notificationReceived: Notification!
}

服务器设置(Apollo Server + graphql-ws)

import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { PubSub } from 'graphql-subscriptions';
import express from 'express';
import http from 'http';

const pubsub = new PubSub();
const MESSAGE_SENT = 'MESSAGE_SENT';

const resolvers = {
  Subscription: {
    messageSent: {
      // subscribe 返回 AsyncIterator
      subscribe: (_, { roomId }) => {
        return pubsub.asyncIterator([`MESSAGE_SENT_${roomId}`]);
      },
    },
  },
  Mutation: {
    sendMessage: async (_, { input }, { db }) => {
      const message = await db.messages.create({ data: input });
      // 触发订阅推送
      await pubsub.publish(`MESSAGE_SENT_${input.roomId}`, {
        messageSent: message,
      });
      return message;
    },
  },
};

const schema = makeExecutableSchema({ typeDefs, resolvers });
const app = express();
const httpServer = http.createServer(app);

// WebSocket 服务器(处理 Subscription)
const wsServer = new WebSocketServer({
  server: httpServer,
  path: '/graphql',
});
const serverCleanup = useServer({ schema }, wsServer);

const server = new ApolloServer({
  schema,
  plugins: [{
    async serverWillStart() {
      return {
        async drainServer() { await serverCleanup.dispose(); },
      };
    },
  }],
});

await server.start();
app.use('/graphql', expressMiddleware(server));
httpServer.listen(4000);

withFilter:过滤推送

import { withFilter } from 'graphql-subscriptions';

const resolvers = {
  Subscription: {
    messageSent: {
      // withFilter 只推送满足条件的事件
      subscribe: withFilter(
        // 订阅所有消息事件
        () => pubsub.asyncIterator([MESSAGE_SENT]),
        // 过滤:只推送目标 roomId 的消息
        (payload, variables) => {
          return payload.messageSent.roomId === variables.roomId;
        }
      ),
    },
  },
};

Redis PubSub(生产环境)

内置的 PubSub 是基于内存的,只适合单机环境。生产中需要使用 Redis PubSub 以支持多实例部署:

import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';

const pubsub = new RedisPubSub({
  publisher: new Redis({ host: 'redis', port: 6379 }),
  subscriber: new Redis({ host: 'redis', port: 6379 }),
});
// Redis PubSub 与内置 PubSub API 完全一致,无需修改 Resolver

客户端订阅(Apollo Client)

import { useSubscription, gql } from '@apollo/client';

const MESSAGE_SENT = gql`
  subscription MessageSent($roomId: ID!) {
    messageSent(roomId: $roomId) {
      id
      content
      sender { name avatar }
      createdAt
    }
  }
`;

function ChatRoom({ roomId }: { roomId: string }) {
  const [messages, setMessages] = useState([]);

  useSubscription(MESSAGE_SENT, {
    variables: { roomId },
    onData({ data }) {
      if (data.data?.messageSent) {
        setMessages(prev => [...prev, data.data!.messageSent]);
      }
    },
  });

  return <div>{messages.map(m => <Message key={m.id} {...m} />)}</div>;
}

Subscription 核心概念

AsyncIterator(异步迭代器)
Subscription Resolver 的 subscribe 函数必须返回一个 AsyncIterator——它是一个可以异步迭代的对象,每次调用 next() 都会等待下一个事件。PubSub.asyncIterator(['TOPIC']) 返回这样的迭代器,当有人调用 pubsub.publish('TOPIC', data) 时,迭代器产生新值并推送给客户端。
PubSub(发布/订阅)
事件中间件:Mutation 调用 publish 向特定频道发布事件;Subscription 的 asyncIterator 监听该频道。内置 PubSub 基于内存,单机开发使用;生产环境需要 RedisPubSub,支持多实例水平扩展。
graphql-ws 协议
WebSocket 上的 GraphQL 标准协议(取代旧版 subscriptions-transport-ws)。消息类型:connection_init/ack(握手)、subscribe(开始订阅)、next(推送数据)、error(错误)、complete(结束)。确保客户端和服务器使用相同版本的协议库。
withFilter 过滤器
当所有 Subscription 共享同一 PubSub 频道时,withFilter 决定哪些事件应推送给当前订阅者。第二个参数是判断函数:return payload.roomId === variables.roomId——只有房间 ID 匹配时才推送,避免每个客户端收到全部消息。

Subscription 授权认证

// WebSocket 连接时也需要验证 Token
const serverCleanup = useServer(
  {
    schema,
    // onConnect 在握手时调用,验证 Token
    onConnect: async (ctx) => {
      const token = ctx.connectionParams?.authorization as string;
      const user = await getUserFromToken(token);
      if (!user) {
        throw new Error('Unauthorized');
        // 抛出错误会关闭 WebSocket 连接
      }
    },
    // 将用户注入到 context 中(每个 Subscription 请求都会调用)
    context: async (ctx) => {
      const token = ctx.connectionParams?.authorization as string;
      const user = await getUserFromToken(token);
      return { user, db, pubsub };
    },
  },
  wsServer
);

// 客户端发送 Token(Apollo Client)
const wsLink = new GraphQLWsLink(createClient({
  url: 'ws://localhost:4000/graphql',
  connectionParams: {
    // 在连接参数中携带 Token
    authorization: `Bearer ${localStorage.getItem('token')}`,
  },
}));
Subscription vs SSE vs WebSocket(原生)

GraphQL Subscription 封装了 WebSocket,提供了自动重连、多路复用(单连接多订阅)和标准的 graphql-ws 协议。与直接用原生 WebSocket 相比,它有更好的开发体验;与 SSE(Server-Sent Events)相比,它支持双向通信,客户端也可以发送消息(如心跳 ping)。

本章小结

本章核心要点