流式 RPC 的适用场景
| 流式模式 | proto 声明 | 适用场景 |
|---|---|---|
| 服务端流 | returns (stream T) | 大列表分页推送、实时日志推送、股票行情订阅 |
| 客户端流 | rpc Foo(stream T) returns (...) | 大文件分块上传、批量写入、传感器数据上报 |
服务端流:实时日志推送
Proto 定义
message WatchLogsRequest {
string service_name = 1;
string level = 2; // "info", "error", etc.
}
message LogEntry {
string level = 1;
string message = 2;
google.protobuf.Timestamp timestamp = 3;
}
service LogService {
// 服务端流:一个请求,多条响应
rpc WatchLogs(WatchLogsRequest) returns (stream LogEntry);
}
Go 服务端:使用 stream.Send() 推送
func (s *logServer) WatchLogs(
req *pb.WatchLogsRequest,
stream pb.LogService_WatchLogsServer,
) error {
log.Printf("Client watching logs for: %s", req.ServiceName)
// 模拟实时日志:每秒推送一条
for i := 0; i < 100; i++ {
// 检查客户端是否已断开
if err := stream.Context().Err(); err != nil {
log.Printf("Client disconnected: %v", err)
return nil
}
entry := &pb.LogEntry{
Level: req.Level,
Message: fmt.Sprintf("[%s] Log entry #%d from %s", req.Level, i, req.ServiceName),
Timestamp: timestamppb.Now(),
}
// 推送消息给客户端
if err := stream.Send(entry); err != nil {
return fmt.Errorf("failed to send log: %w", err)
}
time.Sleep(500 * time.Millisecond)
}
// 函数返回即表示流结束(EOF 自动发送)
return nil
}
Go 客户端:for 循环接收
func watchLogs(client pb.LogServiceClient) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
stream, err := client.WatchLogs(ctx, &pb.WatchLogsRequest{
ServiceName: "order-service",
Level: "error",
})
if err != nil {
log.Fatalf("WatchLogs failed: %v", err)
}
// 循环接收,直到流结束(io.EOF)
for {
entry, err := stream.Recv()
if err == io.EOF {
log.Println("Stream ended")
break
}
if err != nil {
log.Printf("Error: %v", err)
break
}
log.Printf("[%s] %s", entry.Level, entry.Message)
}
}
客户端流:大文件分块上传
Proto 定义
message FileChunk {
string filename = 1;
bytes data = 2; // 分块数据(推荐每块 64KB)
int64 offset = 3; // 在文件中的偏移
bool is_last = 4; // 是否是最后一块
}
message UploadResult {
string file_id = 1;
string url = 2;
int64 total_size = 3;
}
service FileService {
// 客户端流:多条请求,一条响应
rpc UploadFile(stream FileChunk) returns (UploadResult);
}
Go 服务端:接收流并汇总
func (s *fileServer) UploadFile(
stream pb.FileService_UploadFileServer,
) error {
var (
filename string
totalSize int64
buf bytes.Buffer
)
// 循环接收分块
for {
chunk, err := stream.Recv()
if err == io.EOF {
// 客户端已发送完毕,处理并响应
break
}
if err != nil {
return status.Errorf(codes.Internal,
"failed to receive chunk: %v", err)
}
if filename == "" {
filename = chunk.Filename
}
buf.Write(chunk.Data)
totalSize += int64(len(chunk.Data))
log.Printf("Received chunk: %d bytes (total: %d)",
len(chunk.Data), totalSize)
}
// 存储文件(此处省略实际存储逻辑)
fileID := uuid.New().String()
url := fmt.Sprintf("https://cdn.example.com/files/%s/%s",
fileID, filename)
// 发送最终响应(客户端流只有一次 SendAndClose)
return stream.SendAndClose(&pb.UploadResult{
FileId: fileID,
Url: url,
TotalSize: totalSize,
})
}
Go 客户端:分块发送文件
func uploadFile(client pb.FileServiceClient, filePath string) {
file, err := os.Open(filePath)
if err != nil {
log.Fatalf("failed to open file: %v", err)
}
defer file.Close()
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
stream, err := client.UploadFile(ctx)
if err != nil {
log.Fatalf("UploadFile failed: %v", err)
}
const chunkSize = 64 * 1024 // 64 KB
buf := make([]byte, chunkSize)
filename := filepath.Base(filePath)
var offset int64
for {
n, err := file.Read(buf)
if n > 0 {
chunk := &pb.FileChunk{
Filename: filename,
Data: buf[:n],
Offset: offset,
}
if sendErr := stream.Send(chunk); sendErr != nil {
log.Fatalf("Send failed: %v", sendErr)
}
offset += int64(n)
}
if err == io.EOF {
break
}
if err != nil {
log.Fatalf("Read error: %v", err)
}
}
// 发送完毕,等待服务端响应
result, err := stream.CloseAndRecv()
if err != nil {
log.Fatalf("CloseAndRecv failed: %v", err)
}
log.Printf("Upload complete! URL: %s, Size: %d bytes",
result.Url, result.TotalSize)
}
gRPC 消息大小限制:gRPC 默认单条消息最大为 4MB。传输大文件时必须分块(chunk),每块建议 64KB-1MB。不要尝试在一个 Unary RPC 中传输大文件——应使用客户端流模式。
流的关闭与错误处理
- io.EOF 流的正常结束信号。客户端 Recv() 返回 io.EOF 时表示服务端正常关闭了流;服务端 Recv() 返回 io.EOF 时表示客户端发送完毕。
- stream.Context().Err() 服务端应定期检查客户端是否已取消请求(context.Canceled)或超时(context.DeadlineExceeded),避免无谓计算。
- CloseAndRecv() 客户端流的客户端调用此方法表示"发送完毕,等待服务端响应"。对应服务端的 SendAndClose()。
- 背压控制 gRPC 基于 HTTP/2 的流量控制,当接收方缓冲满时,Send() 会自动阻塞(背压)。无需手动处理,但要注意避免在 Send() 前积累大量数据。
本章小结:服务端流适合"一次查询,持续推送"的场景;客户端流适合"批量上传,等待结果"的场景。流的正常关闭通过 io.EOF 信号传递,错误通过 gRPC 状态码传递。下一章我们将学习双向流,实现真正的全双工通信。