Home > Net >  Modifying metadata on Go GRPC server streaming interceptor
Modifying metadata on Go GRPC server streaming interceptor

Time:10-21

I've been trying to set metadata on a server stream interceptor so they could be read downstream by the actual RPC function:

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    ss.SendHeader(metadata.New(map[string]string{"X-User-Id": "real_user_id"}))
    return handler(srv, ss)
}

func (server *Server) GetObjects(req *iam.GetObjectsRequest, client iam.Service_GetObjectsServer) error {
    ctx := client.Context()
    userID, ok := HeaderFromMetadata(ctx, "X-User-Id")

    log.Printf("User ID: %s, Ok: %t\n", userID, ok)
    return nil
}

func HeaderFromMetadata(ctx context.Context, headers ...string) (string, bool) {
    meta, ok := metadata.FromIncomingContext(ctx)
    if !ok {
        return "", false
    }

    for _, header := range headers {
        if value := meta.Get(header); len(value) > 0 {
            return value[0], true
        }
    }

    return "", false
}

And my server is registered like this:

server := grpc.NewServer(
    grpc.StreamInterceptor(UserIDInterceptor))
RegisterIAMServer(server, NewServer())

The problem I'm having is that the user ID header isn't being found. I can see that the interceptor is called when the client sends the request, and I can see that the metadata contains the header, but the actual RPC can't seem to extract it. What am I doing wrong here?

CodePudding user response:

Update

The more simple solution is only override the Context() method of ServerStream

type serverStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (s *serverStream) Context() context.Context {
    return s.ctx
}

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    md, ok := metadata.FromIncomingContext(ss.Context())
    if ok {
        md.Append("X-User-Id", "real_user_id")
    }
    newCtx := metadata.NewIncomingContext(ss.Context(), md)

    return handler(srv, &serverStream{ss, newCtx})
}

Update

Another simple solution is to define one wrapper to grpc.ServerStream as below

type serverStreamWrapper struct {
    ss  grpc.ServerStream
    ctx context.Context
}

func (w serverStreamWrapper) Context() context.Context        { return w.ctx }
func (w serverStreamWrapper) RecvMsg(msg interface{}) error   { return w.ss.RecvMsg(msg) }
func (w serverStreamWrapper) SendMsg(msg interface{}) error   { return w.ss.SendMsg(msg) }
func (w serverStreamWrapper) SendHeader(md metadata.MD) error { return w.ss.SendHeader(md) }
func (w serverStreamWrapper) SetHeader(md metadata.MD) error  { return w.ss.SetHeader(md) }
func (w serverStreamWrapper) SetTrailer(md metadata.MD)       { w.ss.SetTrailer(md) }

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    md, ok := metadata.FromIncomingContext(ss.Context())
    if ok {
        md.Append("X-User-Id", "real_user_id")
    }
    newCtx := metadata.NewIncomingContext(ss.Context(), md)

    return handler(srv, serverStreamWrapper{ss, newCtx})
}

You could use NewIncomingContext to create a copy of the current context in the stream.

Since there is no method to set the context of grpc.ServerStream, in order to set the context back to ServerStream, the wrappedStream is defined with context.Context, and SetContext method to set context.Context

type wrappedStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (w *wrappedStream) SetContext(ctx context.Context) {
    w.ctx = ctx
}

Full sample codes

type wrappedStream struct {
    grpc.ServerStream
    ctx context.Context
}

func (w *wrappedStream) Context() context.Context {
    return w.ctx
}

func (w *wrappedStream) SetContext(ctx context.Context) {
    w.ctx = ctx
}

func (w *wrappedStream) RecvMsg(m interface{}) error {
    return w.ServerStream.RecvMsg(m)
}

func (w *wrappedStream) SendMsg(m interface{}) error {
    return w.ServerStream.SendMsg(m)
}

type StreamContextWrapper interface {
    grpc.ServerStream
    SetContext(context.Context)
}

func newStreamContextWrapper(ss grpc.ServerStream) StreamContextWrapper {
    ctx := ss.Context()
    return &wrappedStream{
        ss,
        ctx,
    }
}

func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
    md, ok := metadata.FromIncomingContext(ss.Context())
    if ok {
        md.Append("X-User-Id", "real_user_id")
    }
    newCtx := metadata.NewIncomingContext(ss.Context(), md)

    sw := newStreamContextWrapper(ss)
    sw.SetContext(newCtx)

    return handler(srv, sw)
}
  • Related