Home > Net >  Set message priority RabbitMQ/Masstransit?
Set message priority RabbitMQ/Masstransit?

Time:06-17

By following this document, I succeed in setting the priority for the RabbitMQ's messages with x-max-priority attribute (the sample code), the received messages are prioritized. But I can't do the same thing with the MassTransit. I found the answer here but it also doesn't work. Here is my MassTransit configuration:

Producer's Startup.cs:

services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host("localhost", "/", h =>
        {
            h.Username("username");
            h.Password("password");
        });

        cfg.EnablePriority(10);
        cfg.ConfigureEndpoints(context);
    });
});

Producer's OrdersController.cs

[HttpPost("SendOrders")]
public async Task<IActionResult> SendOrders(IList<Order> orders)
{
    foreach(var order in orders)
    {
        await _busControl.Publish(order, context =>
        {
            context.SetPriority(order.Priority);
        });

        _logger.LogInformation($"Sent order: {order.Id} - {order.Name}");
    }

    return Ok();
}

Order.cs

public class Order
{
    public int Id { get; set; }
    public string Name { get; set; }
    public byte Priority { get; set; }
}

In Consumer's Startup.cs

services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, configurator) =>
    {
        configurator.Host("localhost", "/", h =>
        {
            h.Username("username");
            h.Password("password");
        });

        //configurator.EnablePriority(10);

        configurator.ReceiveEndpoint("order-queue", c =>
        {
            c.EnablePriority(10);
            c.Consumer<OrderConsumer>();
        });
    });
});

OrderConsumer.cs:

public class OrderConsumer : IConsumer<Order>
{
    public async Task Consume(ConsumeContext<Order> context)
    {
        await Console.Out.WriteLineAsync($"Received order: {context.Message.Id} - {context.Message.Name}");
    }
}

The messages come randomly and not follow my priority, even I move the c.EnablePriority out to ReceiveEndpoint()method. Did I do something wrong here?

Thanks.

CodePudding user response:

Unless you specify a PrefetchCount of 1, messages are going to be pulling from the queue and processed up to the specified prefetch count.

If there are 7 messages waiting, with a mix of priorities, and the PrefetchCount is 8, all 7 will be read and processed concurrently.

  • Related