Chapter 07

拦截器与中间件

在 RPC 调用链中注入日志、认证、限流等横切关注点,不污染业务代码

拦截器类型

gRPC 有四种拦截器,覆盖服务端/客户端 × Unary/Stream 四个维度:

拦截器类型类型签名用途
UnaryServerInterceptor服务端 Unary RPC 拦截日志、认证、限流
StreamServerInterceptor服务端 Stream RPC 拦截流的认证、监控
UnaryClientInterceptor客户端 Unary RPC 拦截重试、链路追踪
StreamClientInterceptor客户端 Stream RPC 拦截重连、监控

Unary 服务端拦截器

// 拦截器函数签名
type UnaryServerInterceptor func(
  ctx   context.Context,
  req   interface{},         // 请求消息
  info  *grpc.UnaryServerInfo, // 方法信息
  handler grpc.UnaryHandler,  // 实际处理函数
) (interface{}, error)

日志拦截器(记录请求耗时)

func LoggingInterceptor(
  ctx     context.Context,
  req     interface{},
  info    *grpc.UnaryServerInfo,
  handler grpc.UnaryHandler,
) (interface{}, error) {
  start := time.Now()

  // 调用实际处理函数
  resp, err := handler(ctx, req)

  duration := time.Since(start)
  code := codes.OK
  if err != nil {
    code = status.Code(err)
  }

  log.Printf(
    "gRPC | method=%-50s | code=%-15s | duration=%v",
    info.FullMethod, code, duration,
  )

  return resp, err
}

JWT 认证拦截器

func AuthInterceptor(
  ctx     context.Context,
  req     interface{},
  info    *grpc.UnaryServerInfo,
  handler grpc.UnaryHandler,
) (interface{}, error) {
  // 白名单:不需要认证的方法
  publicMethods := map[string]bool{
    "/auth.v1.AuthService/Login":    true,
    "/auth.v1.AuthService/Register": true,
  }
  if publicMethods[info.FullMethod] {
    return handler(ctx, req)
  }

  // 从 Metadata 中提取 Authorization header
  md, ok := metadata.FromIncomingContext(ctx)
  if !ok {
    return nil, status.Error(codes.Unauthenticated, "missing metadata")
  }

  authValues := md.Get("authorization")
  if len(authValues) == 0 {
    return nil, status.Error(codes.Unauthenticated, "missing token")
  }

  tokenStr := strings.TrimPrefix(authValues[0], "Bearer ")

  // 验证 JWT Token
  claims, err := validateJWT(tokenStr)
  if err != nil {
    return nil, status.Errorf(codes.Unauthenticated, "invalid token: %v", err)
  }

  // 将 claims 注入 context,供业务层使用
  ctx = context.WithValue(ctx, contextKeyUserID{}, claims.UserID)
  return handler(ctx, req)
}

// 业务层从 context 取出用户 ID
func (s *userServer) GetProfile(
  ctx context.Context, req *pb.GetProfileRequest,
) (*pb.ProfileResponse, error) {
  userID := ctx.Value(contextKeyUserID{}).(string)
  // ... 使用 userID 查询数据
}

链式拦截器

gRPC 通过 grpc.ChainUnaryInterceptor 将多个拦截器串联,执行顺序从左到右(类似洋葱模型)。

func main() {
  s := grpc.NewServer(
    // 链式拦截器:先执行 Logging,再执行 Recovery,最后执行 Auth
    grpc.ChainUnaryInterceptor(
      LoggingInterceptor,
      RecoveryInterceptor,   // 捕获 panic
      AuthInterceptor,
    ),
    // 流式拦截器
    grpc.ChainStreamInterceptor(
      StreamLoggingInterceptor,
      StreamAuthInterceptor,
    ),
  )
}

Recovery 拦截器(捕获 panic)

func RecoveryInterceptor(
  ctx     context.Context,
  req     interface{},
  info    *grpc.UnaryServerInfo,
  handler grpc.UnaryHandler,
) (resp interface{}, err error) {
  defer func() {
    if r := recover(); r != nil {
      log.Printf("PANIC recovered in %s: %v\n%s",
        info.FullMethod, r, debug.Stack())
      err = status.Errorf(codes.Internal, "internal error")
    }
  }()
  return handler(ctx, req)
}

Stream 服务端拦截器

func StreamLoggingInterceptor(
  srv     interface{},
  ss      grpc.ServerStream,
  info    *grpc.StreamServerInfo,
  handler grpc.StreamHandler,
) error {
  start := time.Now()
  log.Printf("Stream started: %s", info.FullMethod)

  err := handler(srv, ss)

  log.Printf("Stream ended: %s | duration=%v | err=%v",
    info.FullMethod, time.Since(start), err)
  return err
}

// 包装 ServerStream 以拦截 Send/Recv
type wrappedStream struct {
  grpc.ServerStream
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
  err := w.ServerStream.RecvMsg(m)
  log.Printf("Stream Recv: %T %v", m, err)
  return err
}

func (w *wrappedStream) SendMsg(m interface{}) error {
  err := w.ServerStream.SendMsg(m)
  log.Printf("Stream Send: %T %v", m, err)
  return err
}

客户端拦截器

// 客户端拦截器:自动附加 Authorization header
func ClientAuthInterceptor(token string) grpc.UnaryClientInterceptor {
  return func(
    ctx     context.Context,
    method  string,
    req, reply interface{},
    cc     *grpc.ClientConn,
    invoker grpc.UnaryInvoker,
    opts   ...grpc.CallOption,
  ) error {
    // 注入 token 到 metadata
    ctx = metadata.AppendToOutgoingContext(ctx,
      "authorization", "Bearer "+token)
    return invoker(ctx, method, req, reply, cc, opts...)
  }
}

// 注册到 client connection
conn, _ := grpc.NewClient("localhost:50051",
  grpc.WithTransportCredentials(insecure.NewCredentials()),
  grpc.WithUnaryInterceptor(ClientAuthInterceptor(myToken)),
)

go-grpc-middleware 生态

社区提供了 go-grpc-middleware/v2 库,内置大量生产级拦截器,开箱即用。

# 安装
go get github.com/grpc-ecosystem/go-grpc-middleware/v2
import (
  "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
  "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
  "github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/ratelimit"
)

// 使用 go-grpc-middleware 的开箱即用拦截器
s := grpc.NewServer(
  grpc.ChainUnaryInterceptor(
    // 结构化日志(兼容 logrus、zap、slog)
    logging.UnaryServerInterceptor(
      logging.LoggerFunc(func(ctx context.Context,
        lvl logging.Level, msg string, fields ...any) {
        slog.Log(ctx, slog.Level(lvl), msg, fields...)
      })),
    // Panic 恢复
    recovery.UnaryServerInterceptor(),
    // 限流
    ratelimit.UnaryServerInterceptor(&myRateLimiter{}),
  ),
)
包名功能
interceptors/logging结构化日志,支持 zap/logrus/slog
interceptors/recoverypanic 恢复,转换为 Internal 错误
interceptors/ratelimit限流拦截器
interceptors/validator自动验证请求字段
interceptors/timeout为每个请求设置超时
interceptors/tracingOpenTelemetry 链路追踪
interceptors/retry客户端自动重试

本章小结:拦截器是 gRPC 中实现横切关注点(Cross-Cutting Concerns)的标准机制。合理使用链式拦截器,可以把日志、认证、限流、监控等基础设施代码彻底从业务代码中分离,保持 handler 的纯粹。生产项目推荐基于 go-grpc-middleware 构建拦截器栈,成熟可靠。