Home > Back-end >  Cann't get a Send Filter to work on MassTransit
Cann't get a Send Filter to work on MassTransit

Time:06-22

I'm trying to capture metadata coming from an HTTP Request (like user-id, trace-id, tenant-id, etc.) and send it through a MassTransit IRequestClient.

The client code sending the message is as follows (using HotChocolate to implement a GraphQL API):

public async Task<Client> ClientCreate(
    [Argument] ClientCreateInput input,
    [Service] IMapper mapper,
    [Service] IRequestClient<ClientCreate> requester,
    [Service] IMessageValidator<ClientCreate> validator,
    CancellationToken cancellationToken)
{
    var command = await validator.ValidateAsync(
        mapper.Map<ClientCreate>(input), cancellationToken);
    
    var response = await requester.GetResponse<ClientContract, ErrorResponse>(
        command, cancellationToken);

    return response.TransformResponseMessage(mapper.Map<Client>);
}

In order to do that, I created the following filter:

public class SendApiDomainMetadataFilter : IFilter<SendContext>
{
    public void Probe(ProbeContext context)
    {
        context.CreateFilterScope(nameof(SendApiDomainMetadataFilter));
    }

    public async Task Send(SendContext context, IPipe<SendContext> next)
    {
        Console.WriteLine("Sending metadata ...");
        if (context.TryGetPayload(out IServiceProvider? serviceProvider) &&
            serviceProvider!.GetService<IResolverContext>() is { } resolverContext &&
            resolverContext.GetDomainMetadata() is {} metadata)
        {
            var serializedMetadata = System.Text.Json.JsonSerializer.Serialize(metadata);
            Console.WriteLine("Sending metadata: '{0}'", serializedMetadata);
            context.Headers.Set(nameof(DomainMetadata), serializedMetadata);
        }

        await next.Send(context);
    }
}

The problem is that the filter is never called. I tried the following setup:


builder.Services.AddMassTransit(bus =>
{
    bus.UsingRabbitMq((context, configurator) =>
    {
        // ...

        configurator.UseConsumeFilter(typeof(ValidateConsumeFilter<>), context);
        configurator.UseSendFilter(typeof(SendApiDomainMetadataFilter), context);

        configurator.ConfigureSend(sendConfigurator =>
        {
            Console.WriteLine("Configuring send pipeline");
            sendConfigurator.UseFilter(new SendApiDomainMetadataFilter());
        });

        configurator.ConfigureEndpoints(context);

        configurator.Host(...);
    });
});

I guess that only one of configurator.UseSendFilter and sendConfigurator.UseFilter should be necessary to call send on the filter, but none are being useful for that.

What am I missing from this configuration to make every call of IRequestClient.Send to go through SendApiDomainMetadataFilter.Send before sending the message to RabbitMQ?

UPDATE: Indeed, I previously tried to create a Scoped Filter like the one I have for ValidateConsumeFilter. All the examples work on the Consume side. The issue is that if I send a message from outside a consumer/saga, the Send pipeline is not being used. After Chris suggestions, I created a simple DummySendFilter:

public class DummySendFilter<TMessage> : IFilter<SendContext<TMessage>>
    where TMessage : class
{
    public void Probe(ProbeContext context)
    {
        context.CreateFilterScope("DummySendFilter");
    }

    public async Task Send(SendContext<TMessage> context, IPipe<SendContext<TMessage>> next)
    {
        Console.WriteLine("Dummy Sending "   typeof(TMessage).Name);
        await next.Send(context);
    }
}

And because I have two services communicating through MassTransit (gateway and service), I added the dummy filter in both, like:

// Inside UsingRabbitMq
configurator.UseSendFilter(typeof(DummySendFilter<>), svp);

When I send the message from the gateway using the interface IRequestClient<ClientCreate>, the message is send without going through the dummy filter. Then It gets to the consummer, after going through the validation filter (as expected), and then, when the consumer responds, that response message goes through the dummy filter (configured on the service, not on the gateway), printing Dummy Sending ClientContract, (as expected).

I just don't know how to activate the send filter from the gateway, even before it gets to RabbitMQ

UPDATE: Using a filter on PublishContext worked as suggested by the answer. Thanks Chris for pointing to the right direction

CodePudding user response:

Scoped filters should be generic, just like your consume filter. And you can more easily get your filter dependency from the constructor as a scoped filter instead of all that workaround stuff being done to "get domain metadata."

Since you have a scoped consume filter, you should already understand what you need to create the send filter properly.

UPDATE: It's likely because your request is published, and not sent. So create a matched set of send/publish filters to have either scenario.

  • Related