I am trying to set up MassTransit with RabbitMQ for my application.
I am using CastleWindsor DI and .Net Framework 4.7.
The way I register my consumers and bus in the Program.cs is:
container.AddMassTransit(x =>
{
x.AddBus(context => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(configurationProvider.RabbitHostName);
x.AddConsumer<FactAddedFirstHandler>();
x.AddConsumer<FactAddedSecondHandler>();
cfg.ReceiveEndpoint("addedFactQueue", ec =>
{
cfg.ConfigureEndpoints(container);
});
}));
});
IBusControl busControl = container.Kernel.Resolve<IBusControl>();
busControl.StartAsync();
An example of an Handler is:
internal class FactAddedFirstHandler : IConsumer<FactAddedEvent>
{
public async Task Consume(ConsumeContext<FactAddedEvent> context)
{
//Do stuff
}
}
Let's call this main running application simply as Application. What I want to achieve are 2 different things:
1) whenever a FactAddedEvent is thrown (from another service) then Application delivers the message to both consumers: FactAddedFirstHandler and FactAddedSecondHandler.
This happens correctly because if I understood correctly MassTransit creates an exchange and delivers the message to all the consumers subscribed to that message.
2) if I run 2 instances of Application (let's call them Application1 and Application2) then Round-Robin is applied. Meaning that RabbitMQ balances the messages to the Application istance that is free.
This happens partly correctly. Meaning that if I have both istances up and running and try to send a FactAddedEvent then one time the event is received by Application1 and one time by Application2. Also, if I bring for example Application2 down then all the messages are received by Application1. So far, everything working properly.
THE PROBLEM: the problem I have appears when I am voluntary blocking for example Application1. Let's say I send the FactAddedEvent, it's received by Application1 but in the consuming method of the handler I set a breakpoint and I wait there. Therefore here I am voluntary blocking Application1. Now what I would expect is that since Application1 is blocked, RabbitMQ will deliver all the next events I throw to ONLY Application2. Instead what it just does is: once RabbitMQ sends it to Application1 once it sends it to Application2, no matter what. So in Application1 there will be a queue of messages piling up even though Application2 is free to do work.
What am I doing wrong? Thanks in advance
CodePudding user response:
RabbitMQ channels have a PrefetchCount, which is the number of unacknowledged messages the broker will send to a consumer before pausing to wait for those messages to be acknowledged. By default, MassTransit sets this to 16, or the number of CPU cores x 2, whichever is greater. This setting is for each receive endpoint.
When running multiple instances, the broker will decide based on its algorithm which channel/consumer receives the message. The rule is to deliver messages directly to consumers with available prefetch to avoid an initial write to disk. Once all consumers prefetch limits are reached, messages are buffered.
Even in a breakpoint, the server thinks you're still alive and kicking, so it will continue to send messages until you fill up.
With the configuration you're using, you can create a ConsumerDefinition
for your consumer to change that prefetch value for the receive endpoint. You can also just configure the consumer inline, using the .Endpoint()
syntax:
x.AddConsumer<FactAddedFirstHandler>()
.Endpoint(x => x.PrefetchCount = 1);
That count is really low, and I wouldn't recommend that value in production, but for your testing it is fine.
Hopefully that gives you some tools to get things working the way you expect.