Home > Blockchain >  MassTransit consumed on single server
MassTransit consumed on single server

Time:03-08

I am struggling to configure MassTransit with RabbitMQ to publish messages and subscribe to the queue. I think that its a simple configuration change that needs to be done. I have multiple services connected but the messages get consumed alternatively and never gets delivered / consumed on the other server.

I would like each message to get delivered to every connection / subscriber.

I am running this on ASP.net core 6 on the latest version of MassTransit.

 services.TryAddSingleton(KebabCaseEndpointNameFormatter.Instance);
        services.AddMassTransit(cfg =>
   {
       cfg.AddBus(context => Bus.Factory.CreateUsingRabbitMq(c =>
       {
            c.Host(connectionString, c =>
            {
                c.Heartbeat(10);
            });
            c.ConfigureEndpoints(
                 context, 
                 KebabCaseEndpointNameFormatter.Instance);

            c.Publish<VideoManagerResultEvent>(x =>
            {
                x.BindQueue("result", "video-msgs");
                x.ExchangeType = Fanout;
            });

            c.ReceiveEndpoint("result:video-msgs", e =>
            {
                e.Consumer<VideoManagerResultConsumer>();
            });
        }));

        // Request clients / DTO
        RegisterRequestClients(cfg);
    });

    services.AddMassTransitHostedService();
  }




 private static void RegisterRequestClients(IServiceCollectionBusConfigurator cfg)
 {
    cfg.AddRequestClient<VideoManagerResultEvent>();
 }




 // consumer
 public class VideoManagerResultConsumer : BaseConsumer<VideoManagerResultEvent>
 {
      public override async Task Consume(ConsumeContext<VideoManagerResultEvent> context)
     {
         Logger.Debug("Consumed video event");
         await context.RespondAsync(new GenericResponse());
      }
 }

I call "SendMessage()" to publish a message to RabbitMQ.

// constructor and DI
public EventBusV2(IPublishEndpoint publishEndpoint)
{
    _publishEndpoint = publishEndpoint;
}

public async Task SendMessage()
{
    await _publishEndpoint.Publish<VideoManagerResultEvent>(msg);
}

To add to the original question the diagram display what is currently happening.

Diagram 1 - The request or message gets published to the eventbus but only gets delivered to the one instance of the consumer.

Diagram 2 - The required result the message gets published to both instances of the consumer.

enter image description here

The RaabitMQ Queue

enter image description here

CodePudding user response:

The only configuration you should need is the following:

services.AddMassTransit(cfg =>
{
    cfg.AddConsumer<VideoManagerResultConsumer>()
        .Endpoint(e => e.InstanceId = "Web1"); // or 2, etc.

    cfg.SetKebabCaseEndpointNameFormatter();

    cfg.UsingRabbitMq((context, c) =>
    {
        c.Host(connectionString, c =>
        {
            c.Heartbeat(10);
        });

        c.ConfigureEndpoints(context);
    });

    // Request clients / DTO
    RegisterRequestClients(cfg);
});
services.AddMassTransitHostedService();

Any published messages of type VideoManagerResultEvent will end up on the queue video-manager-result based upon the consumer name.

  • Related