Home > other >  MassTransit - Configuration exception when adding consumers dynamically
MassTransit - Configuration exception when adding consumers dynamically

Time:07-22

I am using MassTransit 8.0.5 with RabbitMQ for service bus implementation in my .NET 6 microservices. I published a message from Service-A and I can see the exchange created by namespace without any queue. The problem is happening when I start my consumer Service-B. It throws the following configuration exception.

enter image description here

Here is my configuration:

public static IServiceCollection AddMassTransit(this IServiceCollection services, Assembly assembly)
    {
        var serviceProvider = services.BuildServiceProvider();

        services.AddMassTransit(configure =>
        {
            configure.SetKebabCaseEndpointNameFormatter();
            configure.AddConsumers(assembly);

            configure.UsingRabbitMq((context, configurator) =>
            {
                var rabbitSettings = serviceProvider.GetService<IOptions<RabbitSettings>>().Value;
                var host = new Uri("rabbitmq://"   rabbitSettings.EventBusConnection);

                configurator.Host(host, h =>
                {
                    h.Username(rabbitSettings.EventBusUserName);
                    h.Password(rabbitSettings.EventBusPassword);
                });

                var types = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
                    .Where(x => x.BaseType == typeof(IntegrationEvent));

                foreach (var type in types)
                {
                    var consumers = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
                        .Where(x => x.IsAssignableTo(typeof(IConsumer<>).MakeGenericType(type))).ToList();

                    if (consumers.Any())
                    {
                        // rabbitSettings.QueueName => service-b
                        configurator.ReceiveEndpoint(rabbitSettings.QueueName, e =>
                            {
                                e.UseConsumeFilter(typeof(InboxFilter<>), context);
                                foreach (var consumer in consumers)
                                {
                                    configurator.ConfigureEndpoints(context, x => x.Exclude(consumer));

                                    var methodInfo = typeof(DependencyInjectionReceiveEndpointExtensions)
                                        .GetMethods()
                                        .Where(x => x.GetParameters()
                                            .Any(p => p.ParameterType == typeof(IServiceProvider)))
                                        .FirstOrDefault(x => x.Name == "Consumer" && x.IsGenericMethod);

                                    var generic = methodInfo?.MakeGenericMethod(consumer);
                                    generic?.Invoke(e, new object[] { e, context, null });
                                }
                            });
                    }
                }
            });
        });

        return services;
    }

IntegrationEvent, which is excluded from topology, is my base type of all integration events. I am trying to add consumers dynamically, but could not figure out what's wrong?

Any help would be appreciated.

CodePudding user response:

Simple answer, you can't call ReceiveEndpoint() with the same queue name twice.

Better answer, your configuration is a mess, and all that reflection is unnecessary. I've simplified it for you.

public static IServiceCollection AddMassTransit(this IServiceCollection services, Assembly assembly)
{
    var serviceProvider = services.BuildServiceProvider();

    services.AddMassTransit(configure =>
    {
        configure.SetKebabCaseEndpointNameFormatter();

        var allTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes()).ToList();

        var eventTypes = allTypes.Where(x => x.BaseType == typeof(IntegrationEvent)).ToList();

        var consumerTypes = allTypes.Where(x => eventTypes.Any(et => x.IsAssignableTo(typeof(IConsumer<>).MakeGenericType(et)))).ToList();

        configure.AddConsumers(consumerTypes);

        configure.UsingRabbitMq((context, configurator) =>
        {
            var rabbitSettings = serviceProvider.GetService<IOptions<RabbitSettings>>().Value;
            var host = new Uri("rabbitmq://"   rabbitSettings.EventBusConnection);

            configurator.Host(host, h =>
            {
                h.Username(rabbitSettings.EventBusUserName);
                h.Password(rabbitSettings.EventBusPassword);
            });

            if (consumerTypes.Any())
            {
                // rabbitSettings.QueueName => service-b
                configurator.ReceiveEndpoint(rabbitSettings.QueueName, e =>
                {
                    e.UseConsumeFilter(typeof(InboxFilter<>), context);
                    e.ConfigureConsumers(context);
                });
            }
        });
    });

    return services;
}
  • Related