Chapter 05

服务端流与客户端流

掌握单向流式 RPC 的实现模式,构建大文件传输与实时数据推送能力

流式 RPC 的适用场景

流式模式proto 声明适用场景
服务端流returns (stream T)大列表分页推送、实时日志推送、股票行情订阅
客户端流rpc Foo(stream T) returns (...)大文件分块上传、批量写入、传感器数据上报

服务端流:实时日志推送

Proto 定义

message WatchLogsRequest {
  string service_name = 1;
  string level        = 2;  // "info", "error", etc.
}

message LogEntry {
  string                    level     = 1;
  string                    message   = 2;
  google.protobuf.Timestamp timestamp = 3;
}

service LogService {
  // 服务端流:一个请求,多条响应
  rpc WatchLogs(WatchLogsRequest) returns (stream LogEntry);
}

Go 服务端:使用 stream.Send() 推送

func (s *logServer) WatchLogs(
  req *pb.WatchLogsRequest,
  stream pb.LogService_WatchLogsServer,
) error {
  log.Printf("Client watching logs for: %s", req.ServiceName)

  // 模拟实时日志:每秒推送一条
  for i := 0; i < 100; i++ {
    // 检查客户端是否已断开
    if err := stream.Context().Err(); err != nil {
      log.Printf("Client disconnected: %v", err)
      return nil
    }

    entry := &pb.LogEntry{
      Level:     req.Level,
      Message:   fmt.Sprintf("[%s] Log entry #%d from %s", req.Level, i, req.ServiceName),
      Timestamp: timestamppb.Now(),
    }

    // 推送消息给客户端
    if err := stream.Send(entry); err != nil {
      return fmt.Errorf("failed to send log: %w", err)
    }

    time.Sleep(500 * time.Millisecond)
  }

  // 函数返回即表示流结束(EOF 自动发送)
  return nil
}

Go 客户端:for 循环接收

func watchLogs(client pb.LogServiceClient) {
  ctx, cancel := context.WithCancel(context.Background())
  defer cancel()

  stream, err := client.WatchLogs(ctx, &pb.WatchLogsRequest{
    ServiceName: "order-service",
    Level:       "error",
  })
  if err != nil {
    log.Fatalf("WatchLogs failed: %v", err)
  }

  // 循环接收,直到流结束(io.EOF)
  for {
    entry, err := stream.Recv()
    if err == io.EOF {
      log.Println("Stream ended")
      break
    }
    if err != nil {
      log.Printf("Error: %v", err)
      break
    }
    log.Printf("[%s] %s", entry.Level, entry.Message)
  }
}

客户端流:大文件分块上传

Proto 定义

message FileChunk {
  string filename  = 1;
  bytes  data      = 2;  // 分块数据(推荐每块 64KB)
  int64  offset    = 3;  // 在文件中的偏移
  bool   is_last   = 4;  // 是否是最后一块
}

message UploadResult {
  string file_id    = 1;
  string url        = 2;
  int64  total_size = 3;
}

service FileService {
  // 客户端流:多条请求,一条响应
  rpc UploadFile(stream FileChunk) returns (UploadResult);
}

Go 服务端:接收流并汇总

func (s *fileServer) UploadFile(
  stream pb.FileService_UploadFileServer,
) error {
  var (
    filename  string
    totalSize int64
    buf       bytes.Buffer
  )

  // 循环接收分块
  for {
    chunk, err := stream.Recv()
    if err == io.EOF {
      // 客户端已发送完毕,处理并响应
      break
    }
    if err != nil {
      return status.Errorf(codes.Internal,
        "failed to receive chunk: %v", err)
    }

    if filename == "" {
      filename = chunk.Filename
    }
    buf.Write(chunk.Data)
    totalSize += int64(len(chunk.Data))
    log.Printf("Received chunk: %d bytes (total: %d)",
      len(chunk.Data), totalSize)
  }

  // 存储文件(此处省略实际存储逻辑)
  fileID := uuid.New().String()
  url := fmt.Sprintf("https://cdn.example.com/files/%s/%s",
    fileID, filename)

  // 发送最终响应(客户端流只有一次 SendAndClose)
  return stream.SendAndClose(&pb.UploadResult{
    FileId:    fileID,
    Url:       url,
    TotalSize: totalSize,
  })
}

Go 客户端:分块发送文件

func uploadFile(client pb.FileServiceClient, filePath string) {
  file, err := os.Open(filePath)
  if err != nil {
    log.Fatalf("failed to open file: %v", err)
  }
  defer file.Close()

  ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
  defer cancel()

  stream, err := client.UploadFile(ctx)
  if err != nil {
    log.Fatalf("UploadFile failed: %v", err)
  }

  const chunkSize = 64 * 1024  // 64 KB
  buf := make([]byte, chunkSize)
  filename := filepath.Base(filePath)
  var offset int64

  for {
    n, err := file.Read(buf)
    if n > 0 {
      chunk := &pb.FileChunk{
        Filename: filename,
        Data:     buf[:n],
        Offset:   offset,
      }
      if sendErr := stream.Send(chunk); sendErr != nil {
        log.Fatalf("Send failed: %v", sendErr)
      }
      offset += int64(n)
    }
    if err == io.EOF {
      break
    }
    if err != nil {
      log.Fatalf("Read error: %v", err)
    }
  }

  // 发送完毕,等待服务端响应
  result, err := stream.CloseAndRecv()
  if err != nil {
    log.Fatalf("CloseAndRecv failed: %v", err)
  }
  log.Printf("Upload complete! URL: %s, Size: %d bytes",
    result.Url, result.TotalSize)
}

gRPC 消息大小限制:gRPC 默认单条消息最大为 4MB。传输大文件时必须分块(chunk),每块建议 64KB-1MB。不要尝试在一个 Unary RPC 中传输大文件——应使用客户端流模式。

流的关闭与错误处理

本章小结:服务端流适合"一次查询,持续推送"的场景;客户端流适合"批量上传,等待结果"的场景。流的正常关闭通过 io.EOF 信号传递,错误通过 gRPC 状态码传递。下一章我们将学习双向流,实现真正的全双工通信。