Home > Net >  Masstransit with multiple consumers of same message fire multiple times in .NET 6
Masstransit with multiple consumers of same message fire multiple times in .NET 6

Time:11-23

I am having a simple scenario where I am publishing certain message using IPublishEndpoint and I want whatever microservice engages a consumer for it to consume independently on other microservices, but JUST ONCE not 10x. When I configure as documentation says it does not behave as described. It appears to multiplicate the message consumer-count-ish times as each consumer is not firing just once, but 3x in my case. Why?

Exact scenario: I have 3 independent microservices running in docker as mvc projects held in one solution, interconnected with core library where contracts resides. Each project has its own implementation of IConsumer of SAME contract class from core library and every project is registering that consumer at startup using same rabbitmq instance and virtualhost. For demonstration I have simplified the code to minimum.

From vague and confusing masstransit documentation I could not find why is behaving like this or what I am doing wrong nor how should I configure it properly (https://masstransit-project.com/). Masstransit documentation is very fragmented and does not explain much what their main configuration methods actually do for real in rabbitmq.

public interface ISystemVariableChanged
{
    /// <summary>Variable key that was modified.</summary>
    public string Key { get; set; }

    /// <summary>Full reload requested.</summary>
    public bool FullReload { get; set; }
}

3 consumers like this:

public class SystemVariableChangedConsumer : IConsumer<ISystemVariableChanged>
{
    private readonly ILogger<SystemVariableChangedConsumer > logger;

    public SystemVariableChangedConsumer (ILogger<SystemVariableChangedConsumer > logger)
    {
        this.logger = logger;
    }

    public async Task Consume(ConsumeContext<ISystemVariableChanged> context)
    {
        logger.LogInformation("Variable changed in /*ProjectName*/"); // differs per project

        await Task.CompletedTask;
    }
}

3x Startup like this

        services.AddMassTransit(bus =>
        {
            bus.AddConsumer<SystemVariableChangedConsumer>();
            // bus.AddConsumer<SystemVariableChangedConsumer>().Endpoint(p => p.InstanceId = "/*3 different values*/"); // not working either 

            bus.SetKebabCaseEndpointNameFormatter();

            bus.UsingRabbitMq((context, rabbit) =>
            {
                rabbit.Host(options.HostName, options.VirtualHost, h =>
                {
                    h.Username(options.UserName);
                    h.Password(options.Password);
                });

                rabbit.UseInMemoryOutbox();
                rabbit.UseJsonSerializer();
                rabbit.UseRetry(cfg => cfg.Incremental(options.RetryLimit, TimeSpan.FromSeconds(options.RetryTimeout), TimeSpan.FromSeconds(options.RetryTimeout)));

                // rabbit.ConfigureEndpoints(bus); // not working either

                // not working either
                rabbit.ReceiveEndpoint("system-variable-changed", endpoint =>
                {
                    endpoint.ConfigureConsumer<SystemVariableChangedConsumer>(context);
                });
            });
        });

I tried many setups and they tend to behave quite the same wrong way (eg. setting endpoint instance ID etc.).

Regardless if I use ReceiveEndpoint method to configure per endpoint manually or ConfigureEndpoints to configure them all it makes not much of a difference.

I read various materials about this but they did not helped with masstransit setup. This should be absolute basic usecase easily achiveable, idk.

In rabbitmq console it created 1 interface exchange routing to 3 sub-exchanges created per consumer and each of those bound to final queue.

I am looking for some clean solution, not hardcoded queue names.

Can anyone help me with correct startup setup?

Thank you

CodePudding user response:

This is all that is required:

services.AddMassTransit(bus =>
{
    // assuming the same consumer is used, in the same namespace.
    // If the consumers have different names/namespaces, InstanceId is not required
    bus.AddConsumer<SystemVariableChangedConsumer>()
        .Endpoint(p => p.InstanceId = "/*3 different values*/");

    bus.SetKebabCaseEndpointNameFormatter();

    bus.UsingRabbitMq((context, rabbit) =>
    {
        rabbit.Host(options.HostName, options.VirtualHost, h =>
        {
            h.Username(options.UserName);
            h.Password(options.Password);
        });

        rabbit.UseMessageRetry(cfg => cfg.Incremental(options.RetryLimit, TimeSpan.FromSeconds(options.RetryTimeout), TimeSpan.FromSeconds(options.RetryTimeout)));
        rabbit.UseInMemoryOutbox();

        rabbit.ConfigureEndpoints(context);
    });
});

I'd suggest clearing your entire broker exchange/queue binding history before running it, since previous bindings might be causing redelivery issues. But RabbitMQ is usually good about preventing duplicate deliveries for the same message to the same exchange.

  • Related