Home > Back-end >  MassTransit Consumer doesn't consume message from the Azure service bus queue
MassTransit Consumer doesn't consume message from the Azure service bus queue

Time:09-06

I have two service API and ConsoleApp. API will send message to the Azure service bus queue and ConsoleApp should consume that message from the queue.

Long story short, I am able to send the message using MassTransit ISendEndpoint on the queue (actual message appears on the Azure Portal ASB queue) but I am unable to consume it from the ConsoleApp.

API.Program.cs

builder.Services.AddMassTransit(mt =>
{
    mt.UsingAzureServiceBus((context, cfg) =>
    {
        string connectionString = "myConnString";
        string queue = "test";

        cfg.MaxSizeInMegabytes = 5120;
        cfg.Host(connectionString);

        cfg.ReceiveEndpoint(queue, ec =>
        {
            ec.MaxSizeInMegabytes = 5120;
            ec.DefaultMessageTimeToLive = TimeSpan.FromDays(5);

            ec.UseRetry(x => x.Interval(5, TimeSpan.FromSeconds(1)));
            ec.UseScheduledRedelivery(x => x.Incremental(5, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5)));
        });

    });
});

On the ConsoleApp I have registered consumer

builder.Services.AddMassTransit(mt =>
    {
        mt.AddConsumer<MyMessageConsumer>();

        mt.UsingAzureServiceBus((context, cfg) =>
        {
            string connectionString = "myConnString";
            string queue = "test";
    
            cfg.MaxSizeInMegabytes = 5120;
            cfg.Host(connectionString);
    
            cfg.ReceiveEndpoint(queue, ec =>
            {
                ec.MaxSizeInMegabytes = 5120;
                ec.DefaultMessageTimeToLive = TimeSpan.FromDays(5);

                ec.ConfigureConsumer<MyMessageConsumer>(context);
    
                ec.UseRetry(x => x.Interval(5, TimeSpan.FromSeconds(1)));
                ec.UseScheduledRedelivery(x => x.Incremental(5, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5)));
          });
   });

And actual consumer

public class MyMessageConsumer : IConsumer<MyMessage>
    {
        public Task Consume(ConsumeContext<MyMessage> context)
        {
            return Task.CompletedTask;
        }
    }
        });

I have registered consumer with receive endpoint to listen on specific queue but still I'm not able to consume the message (message still appears on the queue). What I'm doing wrong here?

Update: My API.Startup now configures with minimal MassTransit config as suggested from the answer below

builder.Services.AddMassTransit(mt =>
{
    mt.UsingAzureServiceBus((context, cfg) =>
    {
        string connectionString = "myConnString";

        cfg.Host(connectionString);
    });
});

It still successfully sends the message to the queue and still don't consume from the consumer in other project. No messages in the skipped_queue.

CodePudding user response:

Well, you are likely consuming the message from your API, and it's being moved to the _skipped queue. Your API should have a minimal MassTransit configuration, since it isn't consuming messages:

builder.Services.AddMassTransit(mt =>
{
    mt.UsingAzureServiceBus((context, cfg) =>
    {
        string connectionString = "myConnString";
        string queue = "test";

        cfg.Host(connectionString);

        cfg.MaxSizeInMegabytes = 5120;
    });
});

And in your consumer service, you should use proper configuration as well (you were using outdated methods and configured out of order, essentially making them non-existent).

builder.Services.AddMassTransit(mt =>
{
    mt.AddConsumer<MyMessageConsumer>();

    mt.UsingAzureServiceBus((context, cfg) =>
    {
        string connectionString = "myConnString";
        string queue = "test";

        cfg.Host(connectionString);

        cfg.ReceiveEndpoint(queue, ec =>
        {
            ec.MaxSizeInMegabytes = 5120;
            ec.DefaultMessageTimeToLive = TimeSpan.FromDays(5);

            ec.UseMessageRetry(x => x.Interval(5, TimeSpan.FromSeconds(1)));
            ec.UseDelayedRedelivery(x => x.Incremental(5, TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(5)));

            ec.ConfigureConsumer<MyMessageConsumer>(context);
      });
});

Also, make sure that MyMessage is in the same namespace for both projects or the message will not be consumed.

  • Related