双向流的工作模型
双向流(Bidirectional Streaming)允许客户端和服务端在同一个 RPC 连接上同时发送消息,互不等待。这是 gRPC 最强大但也最复杂的通信模式。
双向流示意图:
Client Server
│ ───── Message 1 ──────► │
│ ◄──── Response A ───── │
│ ───── Message 2 ──────► │
│ ───── Message 3 ──────► │ (不等待响应,继续发送)
│ ◄──── Response B ───── │
│ ◄──── Response C ───── │ (服务端也可主动推送)
│ ─── EOF/Close ────────► │
│ ◄──── EOF ──────────── │
实战:实时聊天服务
Proto 定义
message ChatMessage {
string user_id = 1;
string content = 2;
string room_id = 3;
google.protobuf.Timestamp sent_at = 4;
}
service ChatService {
// 双向流:客户端发消息,服务端广播给所有人
rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
Go 服务端:广播聊天室
type chatServer struct {
pb.UnimplementedChatServiceServer
mu sync.RWMutex
clients map[string]pb.ChatService_ChatServer
}
func (s *chatServer) Chat(stream pb.ChatService_ChatServer) error {
// 获取用户 ID(从 Metadata 中)
md, _ := metadata.FromIncomingContext(stream.Context())
userID := extractUserID(md)
// 注册客户端
s.mu.Lock()
s.clients[userID] = stream
s.mu.Unlock()
defer func() {
s.mu.Lock()
delete(s.clients, userID)
s.mu.Unlock()
log.Printf("User %s disconnected", userID)
}()
log.Printf("User %s connected", userID)
// 循环接收消息并广播
for {
msg, err := stream.Recv()
if err == io.EOF {
return nil // 客户端正常关闭
}
if err != nil {
return err // 连接中断
}
log.Printf("[%s] %s: %s", msg.RoomId, msg.UserId, msg.Content)
// 广播给同房间的所有其他用户
s.broadcast(msg, userID)
}
}
func (s *chatServer) broadcast(msg *pb.ChatMessage, senderID string) {
s.mu.RLock()
defer s.mu.RUnlock()
for userID, clientStream := range s.clients {
if userID == senderID {
continue // 不发给自己
}
if err := clientStream.Send(msg); err != nil {
log.Printf("Failed to send to %s: %v", userID, err)
}
}
}
Go 客户端:并发收发
func joinChat(client pb.ChatServiceClient, userID, roomID string) {
// 在 Metadata 中携带用户信息
md := metadata.Pairs("user-id", userID)
ctx := metadata.NewOutgoingContext(context.Background(), md)
stream, err := client.Chat(ctx)
if err != nil {
log.Fatalf("Chat failed: %v", err)
}
var wg sync.WaitGroup
// goroutine 1:接收来自服务端的消息
wg.Add(1)
go func() {
defer wg.Done()
for {
msg, err := stream.Recv()
if err == io.EOF {
return
}
if err != nil {
log.Printf("Recv error: %v", err)
return
}
fmt.Printf("\r[%s] %s\n> ", msg.UserId, msg.Content)
}
}()
// goroutine 2(主线程):读取用户输入并发送
scanner := bufio.NewScanner(os.Stdin)
for {
fmt.Print("> ")
if !scanner.Scan() {
break
}
text := scanner.Text()
if text == "/quit" {
break
}
err := stream.Send(&pb.ChatMessage{
UserId: userID,
Content: text,
RoomId: roomID,
SentAt: timestamppb.Now(),
})
if err != nil {
log.Printf("Send error: %v", err)
break
}
}
// 关闭发送方向,等待服务端完成
stream.CloseSend()
wg.Wait()
}
实战:远程命令执行
Proto 定义
message ExecRequest {
string command = 1; // 命令输入(可持续发送)
bool eof = 2; // 发送完毕标志
}
message ExecResponse {
string output = 1; // 命令输出(实时推送)
bool is_error = 2; // 是否是 stderr
int32 exit_code = 3; // 进程退出码(完成时)
}
service RemoteExecService {
rpc Execute(stream ExecRequest) returns (stream ExecResponse);
}
Go 服务端:实时输出
func (s *execServer) Execute(
stream pb.RemoteExecService_ExecuteServer,
) error {
// 接收第一条消息获取命令
req, err := stream.Recv()
if err != nil {
return err
}
// 启动进程
cmd := exec.CommandContext(stream.Context(),
"bash", "-c", req.Command)
stdout, _ := cmd.StdoutPipe()
stderr, _ := cmd.StderrPipe()
if err := cmd.Start(); err != nil {
return status.Errorf(codes.Internal, "start: %v", err)
}
// 并发读取 stdout 和 stderr,实时发送给客户端
var wg sync.WaitGroup
for _, pipe := range []struct{ io.Reader; bool}{
{stdout, false}, {stderr, true},
} {
p := pipe
wg.Add(1)
go func() {
defer wg.Done()
scanner := bufio.NewScanner(p.Reader)
for scanner.Scan() {
stream.Send(&pb.ExecResponse{
Output: scanner.Text() + "\n",
IsError: p.bool,
})
}
}()
}
wg.Wait()
exitCode := 0
if err := cmd.Wait(); err != nil {
if exitErr, ok := err.(*exec.ExitError); ok {
exitCode = exitErr.ExitCode()
}
}
// 发送退出码
return stream.Send(&pb.ExecResponse{ExitCode: int32(exitCode)})
}
背压控制
gRPC 基于 HTTP/2 的流量控制机制,当接收方处理速度跟不上发送方时,会自动触发背压(Backpressure):
- 流量控制窗口:HTTP/2 每个流和连接都有接收窗口(默认 64KB),接收方消费数据后才会发送 WINDOW_UPDATE 帧扩大窗口
- 自动阻塞:当窗口满时,
stream.Send()会自动阻塞,无需开发者手动处理 - gRPC 配置:可通过
grpc.WithInitialWindowSize()和grpc.WithInitialConnWindowSize()调整窗口大小
// 调整流量控制窗口(吞吐量敏感的服务)
conn, _ := grpc.NewClient("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithInitialWindowSize(1 << 20), // 1MB 流窗口
grpc.WithInitialConnWindowSize(1 << 23), // 8MB 连接窗口
)
Send() 不是线程安全的:在同一个流上,stream.Send() 不是并发安全的。如果多个 goroutine 需要向同一个流发送消息,必须使用 mutex 保护,或者通过 channel 序列化发送操作。
本章小结:双向流是 gRPC 中最灵活的通信模式,通过两个独立的 goroutine 分别处理收和发,可以实现真正的全双工通信。核心要点:接收用 for+Recv(),发送用 Send(),CloseSend() 通知对端"我发完了",io.EOF 代表"对端也发完了"。