拦截器类型
gRPC 有四种拦截器,覆盖服务端/客户端 × Unary/Stream 四个维度:
| 拦截器类型 | 类型签名 | 用途 |
|---|---|---|
| UnaryServerInterceptor | 服务端 Unary RPC 拦截 | 日志、认证、限流 |
| StreamServerInterceptor | 服务端 Stream RPC 拦截 | 流的认证、监控 |
| UnaryClientInterceptor | 客户端 Unary RPC 拦截 | 重试、链路追踪 |
| StreamClientInterceptor | 客户端 Stream RPC 拦截 | 重连、监控 |
Unary 服务端拦截器
// 拦截器函数签名
type UnaryServerInterceptor func(
ctx context.Context,
req interface{}, // 请求消息
info *grpc.UnaryServerInfo, // 方法信息
handler grpc.UnaryHandler, // 实际处理函数
) (interface{}, error)
日志拦截器(记录请求耗时)
func LoggingInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
// 调用实际处理函数
resp, err := handler(ctx, req)
duration := time.Since(start)
code := codes.OK
if err != nil {
code = status.Code(err)
}
log.Printf(
"gRPC | method=%-50s | code=%-15s | duration=%v",
info.FullMethod, code, duration,
)
return resp, err
}
JWT 认证拦截器
func AuthInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
// 白名单:不需要认证的方法
publicMethods := map[string]bool{
"/auth.v1.AuthService/Login": true,
"/auth.v1.AuthService/Register": true,
}
if publicMethods[info.FullMethod] {
return handler(ctx, req)
}
// 从 Metadata 中提取 Authorization header
md, ok := metadata.FromIncomingContext(ctx)
if !ok {
return nil, status.Error(codes.Unauthenticated, "missing metadata")
}
authValues := md.Get("authorization")
if len(authValues) == 0 {
return nil, status.Error(codes.Unauthenticated, "missing token")
}
tokenStr := strings.TrimPrefix(authValues[0], "Bearer ")
// 验证 JWT Token
claims, err := validateJWT(tokenStr)
if err != nil {
return nil, status.Errorf(codes.Unauthenticated, "invalid token: %v", err)
}
// 将 claims 注入 context,供业务层使用
ctx = context.WithValue(ctx, contextKeyUserID{}, claims.UserID)
return handler(ctx, req)
}
// 业务层从 context 取出用户 ID
func (s *userServer) GetProfile(
ctx context.Context, req *pb.GetProfileRequest,
) (*pb.ProfileResponse, error) {
userID := ctx.Value(contextKeyUserID{}).(string)
// ... 使用 userID 查询数据
}
链式拦截器
gRPC 通过 grpc.ChainUnaryInterceptor 将多个拦截器串联,执行顺序从左到右(类似洋葱模型)。
func main() {
s := grpc.NewServer(
// 链式拦截器:先执行 Logging,再执行 Recovery,最后执行 Auth
grpc.ChainUnaryInterceptor(
LoggingInterceptor,
RecoveryInterceptor, // 捕获 panic
AuthInterceptor,
),
// 流式拦截器
grpc.ChainStreamInterceptor(
StreamLoggingInterceptor,
StreamAuthInterceptor,
),
)
}
Recovery 拦截器(捕获 panic)
func RecoveryInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (resp interface{}, err error) {
defer func() {
if r := recover(); r != nil {
log.Printf("PANIC recovered in %s: %v\n%s",
info.FullMethod, r, debug.Stack())
err = status.Errorf(codes.Internal, "internal error")
}
}()
return handler(ctx, req)
}
Stream 服务端拦截器
func StreamLoggingInterceptor(
srv interface{},
ss grpc.ServerStream,
info *grpc.StreamServerInfo,
handler grpc.StreamHandler,
) error {
start := time.Now()
log.Printf("Stream started: %s", info.FullMethod)
err := handler(srv, ss)
log.Printf("Stream ended: %s | duration=%v | err=%v",
info.FullMethod, time.Since(start), err)
return err
}
// 包装 ServerStream 以拦截 Send/Recv
type wrappedStream struct {
grpc.ServerStream
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
err := w.ServerStream.RecvMsg(m)
log.Printf("Stream Recv: %T %v", m, err)
return err
}
func (w *wrappedStream) SendMsg(m interface{}) error {
err := w.ServerStream.SendMsg(m)
log.Printf("Stream Send: %T %v", m, err)
return err
}
客户端拦截器
// 客户端拦截器:自动附加 Authorization header
func ClientAuthInterceptor(token string) grpc.UnaryClientInterceptor {
return func(
ctx context.Context,
method string,
req, reply interface{},
cc *grpc.ClientConn,
invoker grpc.UnaryInvoker,
opts ...grpc.CallOption,
) error {
// 注入 token 到 metadata
ctx = metadata.AppendToOutgoingContext(ctx,
"authorization", "Bearer "+token)
return invoker(ctx, method, req, reply, cc, opts...)
}
}
// 注册到 client connection
conn, _ := grpc.NewClient("localhost:50051",
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithUnaryInterceptor(ClientAuthInterceptor(myToken)),
)
go-grpc-middleware 生态
社区提供了 go-grpc-middleware/v2 库,内置大量生产级拦截器,开箱即用。
# 安装
go get github.com/grpc-ecosystem/go-grpc-middleware/v2
import (
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/logging"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/recovery"
"github.com/grpc-ecosystem/go-grpc-middleware/v2/interceptors/ratelimit"
)
// 使用 go-grpc-middleware 的开箱即用拦截器
s := grpc.NewServer(
grpc.ChainUnaryInterceptor(
// 结构化日志(兼容 logrus、zap、slog)
logging.UnaryServerInterceptor(
logging.LoggerFunc(func(ctx context.Context,
lvl logging.Level, msg string, fields ...any) {
slog.Log(ctx, slog.Level(lvl), msg, fields...)
})),
// Panic 恢复
recovery.UnaryServerInterceptor(),
// 限流
ratelimit.UnaryServerInterceptor(&myRateLimiter{}),
),
)
| 包名 | 功能 |
|---|---|
interceptors/logging | 结构化日志,支持 zap/logrus/slog |
interceptors/recovery | panic 恢复,转换为 Internal 错误 |
interceptors/ratelimit | 限流拦截器 |
interceptors/validator | 自动验证请求字段 |
interceptors/timeout | 为每个请求设置超时 |
interceptors/tracing | OpenTelemetry 链路追踪 |
interceptors/retry | 客户端自动重试 |
本章小结:拦截器是 gRPC 中实现横切关注点(Cross-Cutting Concerns)的标准机制。合理使用链式拦截器,可以把日志、认证、限流、监控等基础设施代码彻底从业务代码中分离,保持 handler 的纯粹。生产项目推荐基于 go-grpc-middleware 构建拦截器栈,成熟可靠。