Home > Blockchain >  Using Azure Functions my Mass Transit messages are never successfully consumed
Using Azure Functions my Mass Transit messages are never successfully consumed

Time:03-12

I have one Azure function project which sends a message to an Azure Service Bus Queue. This works. I can see on Azure the message coming in.

I have another Azure function project which then receives and consumes that message. Although the service bus trigger is hit successfully, the following call to the consumer always breaks.

Startup looks like so:

namespace MassTransitTest.PushNotificationFunction
{
    public class Startup : FunctionsStartup
    {
        private IConfigurationRoot config;

        public override void Configure(IFunctionsHostBuilder builder)
        {
            builder.Services.AddScoped<SendPushNotification>();
            builder.Services.AddMassTransitForAzureFunctions(cfg =>
            {
                cfg.AddConsumersFromNamespaceContaining<ConsumerNamespace>();
            });
        }
    }
}

My consumer looks like this:

namespace MassTransitTest.PushNotificationFunction.ServiceBusTemp
{
    public class EnergyBudgetExceededConsumer : IConsumer<EnergyBudgetExceeded>
    {
        public async Task Consume(ConsumeContext<EnergyBudgetExceeded> context)
        {
            LogContext.Debug?.Log("Successfully consumed message");
        }
    }
}

With struct namespace helper:

namespace MassTransitTest.PushNotificationFunction.ServiceBusTemp
{
    public struct ConsumerNamespace
    {

    }
}

And the consumer uses the following interface:

namespace MassTransitTest.PushNotificationFunction.MessageModels
{
    public interface EnergyBudgetExceeded
    {
        string TestProperty { get; }
    }
}

The receiver function then looks like this:

public class SendPushNotification : BaseFunction
{
    private readonly IMessageReceiver _receiver;

    public SendPushNotification(IMessageReceiver receiver)
    {
        _receiver = receiver;
    }

    [FunctionName("EnergyBudgetExceeded")]
    public Task EnergyBudgetExceeded([ServiceBusTrigger("test-queue")]
        ServiceBusReceivedMessage message, CancellationToken cancellationToken)
    {
        return _receiver.HandleConsumer<EnergyBudgetExceededConsumer>("test-queue", message, cancellationToken);
    }
}

The function above successfully gets hit, but then fails to call the consumer on the "HandleConsumer" method and throws an error that puts the message on the DL Queue.

Service bus error

An example of the message that is failing can be seen here:

{
  "messageId": "4c030000-306b-1065-1a55-08da035849d1",
  "conversationId": "4c030000-306b-1065-3928-08da035849d7",
  "sourceAddress": "sb://fakeurlservicebus.servicebus.windows.net/LAP16844_func_bus_joboyybopcegm7rhbdpygsnnyh?autodelete=300",
  "destinationAddress": "sb://blainetestservicebus.servicebus.windows.net/test-queue",
  "messageType": [
    "urn:message:MassTransitTest.EnergyService.Functions:EnergyBudgetExceeded BlaineTestMessage"
  ],
  "message": {
    "testProperty": "Energy budget"
  },
  "sentTime": "2022-03-11T12:11:35.6792405Z",
  "headers": {},
  "host": {
    "machineName": "LAP-16844",
    "processName": "func",
    "processId": 46740,
    "assembly": "func",
    "assemblyVersion": "3.0.3904.0",
    "frameworkVersion": "3.1.20",
    "massTransitVersion": "7.3.1.0",
    "operatingSystemVersion": "Microsoft Windows NT 6.2.9200.0"
  }
}

The consumer looks like it is getting added at startup like the docs suggest, and I have tried referencing it by name explicitly but this also gives the same error.

Am I missing something obvious here?

I am running Dot Net Core 3.1, and using V7.3.1 of all references MassTransit libraries.

CodePudding user response:

For some reason, the message type you are producing doesn't match the message type consumed by the consumer:

{
  "messageType": [
    "urn:message:MassTransitTest.EnergyService.Functions:EnergyBudgetExceeded BlaineTestMessage"
  ]
}

Does your produced message type implement that message contract/interface?

  • Related