By following this document, I succeed in setting the priority for the RabbitMQ's messages with x-max-priority
attribute (the sample code), the received messages are prioritized. But I can't do the same thing with the MassTransit. I found the answer here but it also doesn't work. Here is my MassTransit configuration:
Producer's Startup.cs:
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, cfg) =>
{
cfg.Host("localhost", "/", h =>
{
h.Username("username");
h.Password("password");
});
cfg.EnablePriority(10);
cfg.ConfigureEndpoints(context);
});
});
Producer's OrdersController.cs
[HttpPost("SendOrders")]
public async Task<IActionResult> SendOrders(IList<Order> orders)
{
foreach(var order in orders)
{
await _busControl.Publish(order, context =>
{
context.SetPriority(order.Priority);
});
_logger.LogInformation($"Sent order: {order.Id} - {order.Name}");
}
return Ok();
}
Order.cs
public class Order
{
public int Id { get; set; }
public string Name { get; set; }
public byte Priority { get; set; }
}
In Consumer's Startup.cs
services.AddMassTransit(x =>
{
x.UsingRabbitMq((context, configurator) =>
{
configurator.Host("localhost", "/", h =>
{
h.Username("username");
h.Password("password");
});
//configurator.EnablePriority(10);
configurator.ReceiveEndpoint("order-queue", c =>
{
c.EnablePriority(10);
c.Consumer<OrderConsumer>();
});
});
});
OrderConsumer.cs:
public class OrderConsumer : IConsumer<Order>
{
public async Task Consume(ConsumeContext<Order> context)
{
await Console.Out.WriteLineAsync($"Received order: {context.Message.Id} - {context.Message.Name}");
}
}
The messages come randomly and not follow my priority, even I move the c.EnablePriority
out to ReceiveEndpoint()
method. Did I do something wrong here?
Thanks.
CodePudding user response:
Unless you specify a PrefetchCount
of 1, messages are going to be pulling from the queue and processed up to the specified prefetch count.
If there are 7 messages waiting, with a mix of priorities, and the PrefetchCount is 8, all 7 will be read and processed concurrently.