I am struggling to configure MassTransit with RabbitMQ to publish messages and subscribe to the queue. I think that its a simple configuration change that needs to be done. I have multiple services connected but the messages get consumed alternatively and never gets delivered / consumed on the other server.
I would like each message to get delivered to every connection / subscriber.
I am running this on ASP.net core 6 on the latest version of MassTransit.
services.TryAddSingleton(KebabCaseEndpointNameFormatter.Instance);
services.AddMassTransit(cfg =>
{
cfg.AddBus(context => Bus.Factory.CreateUsingRabbitMq(c =>
{
c.Host(connectionString, c =>
{
c.Heartbeat(10);
});
c.ConfigureEndpoints(
context,
KebabCaseEndpointNameFormatter.Instance);
c.Publish<VideoManagerResultEvent>(x =>
{
x.BindQueue("result", "video-msgs");
x.ExchangeType = Fanout;
});
c.ReceiveEndpoint("result:video-msgs", e =>
{
e.Consumer<VideoManagerResultConsumer>();
});
}));
// Request clients / DTO
RegisterRequestClients(cfg);
});
services.AddMassTransitHostedService();
}
private static void RegisterRequestClients(IServiceCollectionBusConfigurator cfg)
{
cfg.AddRequestClient<VideoManagerResultEvent>();
}
// consumer
public class VideoManagerResultConsumer : BaseConsumer<VideoManagerResultEvent>
{
public override async Task Consume(ConsumeContext<VideoManagerResultEvent> context)
{
Logger.Debug("Consumed video event");
await context.RespondAsync(new GenericResponse());
}
}
I call "SendMessage()" to publish a message to RabbitMQ.
// constructor and DI
public EventBusV2(IPublishEndpoint publishEndpoint)
{
_publishEndpoint = publishEndpoint;
}
public async Task SendMessage()
{
await _publishEndpoint.Publish<VideoManagerResultEvent>(msg);
}
To add to the original question the diagram display what is currently happening.
Diagram 1 - The request or message gets published to the eventbus but only gets delivered to the one instance of the consumer.
Diagram 2 - The required result the message gets published to both instances of the consumer.
The RaabitMQ Queue
CodePudding user response:
The only configuration you should need is the following:
services.AddMassTransit(cfg =>
{
cfg.AddConsumer<VideoManagerResultConsumer>()
.Endpoint(e => e.InstanceId = "Web1"); // or 2, etc.
cfg.SetKebabCaseEndpointNameFormatter();
cfg.UsingRabbitMq((context, c) =>
{
c.Host(connectionString, c =>
{
c.Heartbeat(10);
});
c.ConfigureEndpoints(context);
});
// Request clients / DTO
RegisterRequestClients(cfg);
});
services.AddMassTransitHostedService();
Any published messages of type VideoManagerResultEvent
will end up on the queue video-manager-result
based upon the consumer name.