I've been trying to set metadata on a server stream interceptor so they could be read downstream by the actual RPC function:
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
ss.SendHeader(metadata.New(map[string]string{"X-User-Id": "real_user_id"}))
return handler(srv, ss)
}
func (server *Server) GetObjects(req *iam.GetObjectsRequest, client iam.Service_GetObjectsServer) error {
ctx := client.Context()
userID, ok := HeaderFromMetadata(ctx, "X-User-Id")
log.Printf("User ID: %s, Ok: %t\n", userID, ok)
return nil
}
func HeaderFromMetadata(ctx context.Context, headers ...string) (string, bool) {
meta, ok := metadata.FromIncomingContext(ctx)
if !ok {
return "", false
}
for _, header := range headers {
if value := meta.Get(header); len(value) > 0 {
return value[0], true
}
}
return "", false
}
And my server is registered like this:
server := grpc.NewServer(
grpc.StreamInterceptor(UserIDInterceptor))
RegisterIAMServer(server, NewServer())
The problem I'm having is that the user ID header isn't being found. I can see that the interceptor is called when the client sends the request, and I can see that the metadata contains the header, but the actual RPC can't seem to extract it. What am I doing wrong here?
CodePudding user response:
Update
The more simple solution is only override the Context()
method of ServerStream
type serverStream struct {
grpc.ServerStream
ctx context.Context
}
func (s *serverStream) Context() context.Context {
return s.ctx
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, &serverStream{ss, newCtx})
}
Update
Another simple solution is to define one wrapper to grpc.ServerStream
as below
type serverStreamWrapper struct {
ss grpc.ServerStream
ctx context.Context
}
func (w serverStreamWrapper) Context() context.Context { return w.ctx }
func (w serverStreamWrapper) RecvMsg(msg interface{}) error { return w.ss.RecvMsg(msg) }
func (w serverStreamWrapper) SendMsg(msg interface{}) error { return w.ss.SendMsg(msg) }
func (w serverStreamWrapper) SendHeader(md metadata.MD) error { return w.ss.SendHeader(md) }
func (w serverStreamWrapper) SetHeader(md metadata.MD) error { return w.ss.SetHeader(md) }
func (w serverStreamWrapper) SetTrailer(md metadata.MD) { w.ss.SetTrailer(md) }
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
return handler(srv, serverStreamWrapper{ss, newCtx})
}
You could use NewIncomingContext to create a copy of the current context in the stream.
Since there is no method to set the context
of grpc.ServerStream
, in order to set the context back to ServerStream
, the wrappedStream
is defined with context.Context
, and SetContext
method to set context.Context
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
Full sample codes
type wrappedStream struct {
grpc.ServerStream
ctx context.Context
}
func (w *wrappedStream) Context() context.Context {
return w.ctx
}
func (w *wrappedStream) SetContext(ctx context.Context) {
w.ctx = ctx
}
func (w *wrappedStream) RecvMsg(m interface{}) error {
return w.ServerStream.RecvMsg(m)
}
func (w *wrappedStream) SendMsg(m interface{}) error {
return w.ServerStream.SendMsg(m)
}
type StreamContextWrapper interface {
grpc.ServerStream
SetContext(context.Context)
}
func newStreamContextWrapper(ss grpc.ServerStream) StreamContextWrapper {
ctx := ss.Context()
return &wrappedStream{
ss,
ctx,
}
}
func UserIDInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
md, ok := metadata.FromIncomingContext(ss.Context())
if ok {
md.Append("X-User-Id", "real_user_id")
}
newCtx := metadata.NewIncomingContext(ss.Context(), md)
sw := newStreamContextWrapper(ss)
sw.SetContext(newCtx)
return handler(srv, sw)
}