Chapter 06

双向流 RPC

全双工通信的终极形态——实现实时聊天服务与远程命令执行,掌握并发收发的 goroutine 架构

双向流的工作模型

双向流(Bidirectional Streaming)允许客户端和服务端在同一个 RPC 连接上同时发送消息,互不等待。这是 gRPC 最强大但也最复杂的通信模式。

双向流示意图:

  Client                    Server
    │  ───── Message 1 ──────► │
    │  ◄──── Response A ─────  │
    │  ───── Message 2 ──────► │
    │  ───── Message 3 ──────► │  (不等待响应,继续发送)
    │  ◄──── Response B ─────  │
    │  ◄──── Response C ─────  │  (服务端也可主动推送)
    │  ─── EOF/Close ────────► │
    │  ◄──── EOF ────────────  │

实战:实时聊天服务

Proto 定义

message ChatMessage {
  string                    user_id   = 1;
  string                    content   = 2;
  string                    room_id   = 3;
  google.protobuf.Timestamp sent_at   = 4;
}

service ChatService {
  // 双向流:客户端发消息,服务端广播给所有人
  rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}

Go 服务端:广播聊天室

type chatServer struct {
  pb.UnimplementedChatServiceServer
  mu      sync.RWMutex
  clients map[string]pb.ChatService_ChatServer
}

func (s *chatServer) Chat(stream pb.ChatService_ChatServer) error {
  // 获取用户 ID(从 Metadata 中)
  md, _ := metadata.FromIncomingContext(stream.Context())
  userID := extractUserID(md)

  // 注册客户端
  s.mu.Lock()
  s.clients[userID] = stream
  s.mu.Unlock()

  defer func() {
    s.mu.Lock()
    delete(s.clients, userID)
    s.mu.Unlock()
    log.Printf("User %s disconnected", userID)
  }()

  log.Printf("User %s connected", userID)

  // 循环接收消息并广播
  for {
    msg, err := stream.Recv()
    if err == io.EOF {
      return nil  // 客户端正常关闭
    }
    if err != nil {
      return err  // 连接中断
    }

    log.Printf("[%s] %s: %s", msg.RoomId, msg.UserId, msg.Content)

    // 广播给同房间的所有其他用户
    s.broadcast(msg, userID)
  }
}

func (s *chatServer) broadcast(msg *pb.ChatMessage, senderID string) {
  s.mu.RLock()
  defer s.mu.RUnlock()

  for userID, clientStream := range s.clients {
    if userID == senderID {
      continue  // 不发给自己
    }
    if err := clientStream.Send(msg); err != nil {
      log.Printf("Failed to send to %s: %v", userID, err)
    }
  }
}

Go 客户端:并发收发

func joinChat(client pb.ChatServiceClient, userID, roomID string) {
  // 在 Metadata 中携带用户信息
  md := metadata.Pairs("user-id", userID)
  ctx := metadata.NewOutgoingContext(context.Background(), md)

  stream, err := client.Chat(ctx)
  if err != nil {
    log.Fatalf("Chat failed: %v", err)
  }

  var wg sync.WaitGroup

  // goroutine 1:接收来自服务端的消息
  wg.Add(1)
  go func() {
    defer wg.Done()
    for {
      msg, err := stream.Recv()
      if err == io.EOF {
        return
      }
      if err != nil {
        log.Printf("Recv error: %v", err)
        return
      }
      fmt.Printf("\r[%s] %s\n> ", msg.UserId, msg.Content)
    }
  }()

  // goroutine 2(主线程):读取用户输入并发送
  scanner := bufio.NewScanner(os.Stdin)
  for {
    fmt.Print("> ")
    if !scanner.Scan() {
      break
    }
    text := scanner.Text()
    if text == "/quit" {
      break
    }

    err := stream.Send(&pb.ChatMessage{
      UserId:  userID,
      Content: text,
      RoomId:  roomID,
      SentAt:  timestamppb.Now(),
    })
    if err != nil {
      log.Printf("Send error: %v", err)
      break
    }
  }

  // 关闭发送方向,等待服务端完成
  stream.CloseSend()
  wg.Wait()
}

实战:远程命令执行

Proto 定义

message ExecRequest {
  string command = 1;  // 命令输入(可持续发送)
  bool   eof     = 2;  // 发送完毕标志
}

message ExecResponse {
  string output   = 1;  // 命令输出(实时推送)
  bool   is_error = 2;  // 是否是 stderr
  int32  exit_code = 3; // 进程退出码(完成时)
}

service RemoteExecService {
  rpc Execute(stream ExecRequest) returns (stream ExecResponse);
}

Go 服务端:实时输出

func (s *execServer) Execute(
  stream pb.RemoteExecService_ExecuteServer,
) error {
  // 接收第一条消息获取命令
  req, err := stream.Recv()
  if err != nil {
    return err
  }

  // 启动进程
  cmd := exec.CommandContext(stream.Context(),
    "bash", "-c", req.Command)

  stdout, _ := cmd.StdoutPipe()
  stderr, _ := cmd.StderrPipe()

  if err := cmd.Start(); err != nil {
    return status.Errorf(codes.Internal, "start: %v", err)
  }

  // 并发读取 stdout 和 stderr,实时发送给客户端
  var wg sync.WaitGroup
  for _, pipe := range []struct{ io.Reader; bool}{
    {stdout, false}, {stderr, true},
  } {
    p := pipe
    wg.Add(1)
    go func() {
      defer wg.Done()
      scanner := bufio.NewScanner(p.Reader)
      for scanner.Scan() {
        stream.Send(&pb.ExecResponse{
          Output:  scanner.Text() + "\n",
          IsError: p.bool,
        })
      }
    }()
  }

  wg.Wait()
  exitCode := 0
  if err := cmd.Wait(); err != nil {
    if exitErr, ok := err.(*exec.ExitError); ok {
      exitCode = exitErr.ExitCode()
    }
  }

  // 发送退出码
  return stream.Send(&pb.ExecResponse{ExitCode: int32(exitCode)})
}

背压控制

gRPC 基于 HTTP/2 的流量控制机制,当接收方处理速度跟不上发送方时,会自动触发背压(Backpressure):

// 调整流量控制窗口(吞吐量敏感的服务)
conn, _ := grpc.NewClient("localhost:50051",
  grpc.WithTransportCredentials(insecure.NewCredentials()),
  grpc.WithInitialWindowSize(1 << 20),       // 1MB 流窗口
  grpc.WithInitialConnWindowSize(1 << 23),   // 8MB 连接窗口
)

Send() 不是线程安全的:在同一个流上,stream.Send() 不是并发安全的。如果多个 goroutine 需要向同一个流发送消息,必须使用 mutex 保护,或者通过 channel 序列化发送操作。

本章小结:双向流是 gRPC 中最灵活的通信模式,通过两个独立的 goroutine 分别处理收和发,可以实现真正的全双工通信。核心要点:接收用 for+Recv(),发送用 Send(),CloseSend() 通知对端"我发完了",io.EOF 代表"对端也发完了"。