Subscription 的工作机制
GraphQL Subscription 基于 WebSocket 长连接实现。与 Query/Mutation 的请求-响应模式不同,Subscription 建立持久连接后,服务端在数据变化时主动推送给客户端。
Client Server
| |
|-- WebSocket Upgrade -----> |
|<--- 101 Switching Prots -- |
| |
|-- connection_init -------> |
|<-- connection_ack -------- |
| |
|-- subscribe (msg_id=1) --> | 订阅开始
| |
|<-- next (data: {...}) ---- | 新消息推送
|<-- next (data: {...}) ---- | 新消息推送
| |
|-- complete (msg_id=1) ---> | 取消订阅
|-- connection_close ------> |
Schema 定义
type Message {
id: ID!
content: String!
sender: User!
room: Room!
createdAt: DateTime!
}
type Subscription {
# 订阅特定房间的新消息
messageSent(roomId: ID!): Message!
# 订阅用户状态变化
userOnlineStatus(userId: ID!): UserStatus!
# 订阅所有新通知(无参数)
notificationReceived: Notification!
}
服务器设置(Apollo Server + graphql-ws)
import { ApolloServer } from '@apollo/server';
import { expressMiddleware } from '@apollo/server/express4';
import { makeExecutableSchema } from '@graphql-tools/schema';
import { WebSocketServer } from 'ws';
import { useServer } from 'graphql-ws/lib/use/ws';
import { PubSub } from 'graphql-subscriptions';
import express from 'express';
import http from 'http';
const pubsub = new PubSub();
const MESSAGE_SENT = 'MESSAGE_SENT';
const resolvers = {
Subscription: {
messageSent: {
// subscribe 返回 AsyncIterator
subscribe: (_, { roomId }) => {
return pubsub.asyncIterator([`MESSAGE_SENT_${roomId}`]);
},
},
},
Mutation: {
sendMessage: async (_, { input }, { db }) => {
const message = await db.messages.create({ data: input });
// 触发订阅推送
await pubsub.publish(`MESSAGE_SENT_${input.roomId}`, {
messageSent: message,
});
return message;
},
},
};
const schema = makeExecutableSchema({ typeDefs, resolvers });
const app = express();
const httpServer = http.createServer(app);
// WebSocket 服务器(处理 Subscription)
const wsServer = new WebSocketServer({
server: httpServer,
path: '/graphql',
});
const serverCleanup = useServer({ schema }, wsServer);
const server = new ApolloServer({
schema,
plugins: [{
async serverWillStart() {
return {
async drainServer() { await serverCleanup.dispose(); },
};
},
}],
});
await server.start();
app.use('/graphql', expressMiddleware(server));
httpServer.listen(4000);
withFilter:过滤推送
import { withFilter } from 'graphql-subscriptions';
const resolvers = {
Subscription: {
messageSent: {
// withFilter 只推送满足条件的事件
subscribe: withFilter(
// 订阅所有消息事件
() => pubsub.asyncIterator([MESSAGE_SENT]),
// 过滤:只推送目标 roomId 的消息
(payload, variables) => {
return payload.messageSent.roomId === variables.roomId;
}
),
},
},
};
Redis PubSub(生产环境)
内置的 PubSub 是基于内存的,只适合单机环境。生产中需要使用 Redis PubSub 以支持多实例部署:
import { RedisPubSub } from 'graphql-redis-subscriptions';
import Redis from 'ioredis';
const pubsub = new RedisPubSub({
publisher: new Redis({ host: 'redis', port: 6379 }),
subscriber: new Redis({ host: 'redis', port: 6379 }),
});
// Redis PubSub 与内置 PubSub API 完全一致,无需修改 Resolver
客户端订阅(Apollo Client)
import { useSubscription, gql } from '@apollo/client';
const MESSAGE_SENT = gql`
subscription MessageSent($roomId: ID!) {
messageSent(roomId: $roomId) {
id
content
sender { name avatar }
createdAt
}
}
`;
function ChatRoom({ roomId }: { roomId: string }) {
const [messages, setMessages] = useState([]);
useSubscription(MESSAGE_SENT, {
variables: { roomId },
onData({ data }) {
if (data.data?.messageSent) {
setMessages(prev => [...prev, data.data!.messageSent]);
}
},
});
return <div>{messages.map(m => <Message key={m.id} {...m} />)}</div>;
}
Subscription 核心概念
AsyncIterator(异步迭代器)
Subscription Resolver 的 subscribe 函数必须返回一个 AsyncIterator——它是一个可以异步迭代的对象,每次调用 next() 都会等待下一个事件。PubSub.asyncIterator(['TOPIC']) 返回这样的迭代器,当有人调用 pubsub.publish('TOPIC', data) 时,迭代器产生新值并推送给客户端。
PubSub(发布/订阅)
事件中间件:Mutation 调用 publish 向特定频道发布事件;Subscription 的 asyncIterator 监听该频道。内置 PubSub 基于内存,单机开发使用;生产环境需要 RedisPubSub,支持多实例水平扩展。
graphql-ws 协议
WebSocket 上的 GraphQL 标准协议(取代旧版 subscriptions-transport-ws)。消息类型:connection_init/ack(握手)、subscribe(开始订阅)、next(推送数据)、error(错误)、complete(结束)。确保客户端和服务器使用相同版本的协议库。
withFilter 过滤器
当所有 Subscription 共享同一 PubSub 频道时,withFilter 决定哪些事件应推送给当前订阅者。第二个参数是判断函数:return payload.roomId === variables.roomId——只有房间 ID 匹配时才推送,避免每个客户端收到全部消息。
Subscription 授权认证
// WebSocket 连接时也需要验证 Token
const serverCleanup = useServer(
{
schema,
// onConnect 在握手时调用,验证 Token
onConnect: async (ctx) => {
const token = ctx.connectionParams?.authorization as string;
const user = await getUserFromToken(token);
if (!user) {
throw new Error('Unauthorized');
// 抛出错误会关闭 WebSocket 连接
}
},
// 将用户注入到 context 中(每个 Subscription 请求都会调用)
context: async (ctx) => {
const token = ctx.connectionParams?.authorization as string;
const user = await getUserFromToken(token);
return { user, db, pubsub };
},
},
wsServer
);
// 客户端发送 Token(Apollo Client)
const wsLink = new GraphQLWsLink(createClient({
url: 'ws://localhost:4000/graphql',
connectionParams: {
// 在连接参数中携带 Token
authorization: `Bearer ${localStorage.getItem('token')}`,
},
}));
Subscription vs SSE vs WebSocket(原生)
GraphQL Subscription 封装了 WebSocket,提供了自动重连、多路复用(单连接多订阅)和标准的 graphql-ws 协议。与直接用原生 WebSocket 相比,它有更好的开发体验;与 SSE(Server-Sent Events)相比,它支持双向通信,客户端也可以发送消息(如心跳 ping)。
本章小结
本章核心要点
- Subscription 三操作类型区别:Query(一次性请求-响应)、Mutation(一次性写操作)、Subscription(持久连接,服务端主动推送);三者在同一 GraphQL 端点,Query/Mutation 走 HTTP,Subscription 走 WebSocket。
- AsyncIterator 机制:subscribe 函数返回异步迭代器;PubSub.asyncIterator 监听指定频道;Mutation 调用 pubsub.publish 触发推送——发布-订阅模式解耦写操作和订阅逻辑。
- 内置 PubSub vs Redis PubSub:内置 PubSub 存于进程内存,单实例开发可用;生产多实例部署必须用 RedisPubSub,确保所有实例共享同一消息总线,API 完全兼容。
- withFilter 按需推送:避免广播所有事件——用过滤函数只推送满足条件的事件(如同房间消息),减少无效 WebSocket 传输,降低客户端处理负担。
- WebSocket 认证:HTTP 请求头认证对 WebSocket 不适用;需在 connectionParams 中携带 Token,服务端在 onConnect 钩子中验证——连接失败直接关闭 WebSocket。
- 架构注意事项:Apollo Server 同一端口同时监听 HTTP 和 WebSocket,需使用 httpServer 共享端口;服务关闭时需 drainServer 优雅断开所有 WebSocket 连接,避免客户端报错。