I am using MassTransit 8.0.5 with RabbitMQ for service bus implementation in my .NET 6 microservices. I published a message from Service-A and I can see the exchange created by namespace without any queue. The problem is happening when I start my consumer Service-B. It throws the following configuration exception.
Here is my configuration:
public static IServiceCollection AddMassTransit(this IServiceCollection services, Assembly assembly)
{
var serviceProvider = services.BuildServiceProvider();
services.AddMassTransit(configure =>
{
configure.SetKebabCaseEndpointNameFormatter();
configure.AddConsumers(assembly);
configure.UsingRabbitMq((context, configurator) =>
{
var rabbitSettings = serviceProvider.GetService<IOptions<RabbitSettings>>().Value;
var host = new Uri("rabbitmq://" rabbitSettings.EventBusConnection);
configurator.Host(host, h =>
{
h.Username(rabbitSettings.EventBusUserName);
h.Password(rabbitSettings.EventBusPassword);
});
var types = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
.Where(x => x.BaseType == typeof(IntegrationEvent));
foreach (var type in types)
{
var consumers = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes())
.Where(x => x.IsAssignableTo(typeof(IConsumer<>).MakeGenericType(type))).ToList();
if (consumers.Any())
{
// rabbitSettings.QueueName => service-b
configurator.ReceiveEndpoint(rabbitSettings.QueueName, e =>
{
e.UseConsumeFilter(typeof(InboxFilter<>), context);
foreach (var consumer in consumers)
{
configurator.ConfigureEndpoints(context, x => x.Exclude(consumer));
var methodInfo = typeof(DependencyInjectionReceiveEndpointExtensions)
.GetMethods()
.Where(x => x.GetParameters()
.Any(p => p.ParameterType == typeof(IServiceProvider)))
.FirstOrDefault(x => x.Name == "Consumer" && x.IsGenericMethod);
var generic = methodInfo?.MakeGenericMethod(consumer);
generic?.Invoke(e, new object[] { e, context, null });
}
});
}
}
});
});
return services;
}
IntegrationEvent, which is excluded from topology, is my base type of all integration events. I am trying to add consumers dynamically, but could not figure out what's wrong?
Any help would be appreciated.
CodePudding user response:
Simple answer, you can't call ReceiveEndpoint()
with the same queue name twice.
Better answer, your configuration is a mess, and all that reflection is unnecessary. I've simplified it for you.
public static IServiceCollection AddMassTransit(this IServiceCollection services, Assembly assembly)
{
var serviceProvider = services.BuildServiceProvider();
services.AddMassTransit(configure =>
{
configure.SetKebabCaseEndpointNameFormatter();
var allTypes = AppDomain.CurrentDomain.GetAssemblies().SelectMany(x => x.GetTypes()).ToList();
var eventTypes = allTypes.Where(x => x.BaseType == typeof(IntegrationEvent)).ToList();
var consumerTypes = allTypes.Where(x => eventTypes.Any(et => x.IsAssignableTo(typeof(IConsumer<>).MakeGenericType(et)))).ToList();
configure.AddConsumers(consumerTypes);
configure.UsingRabbitMq((context, configurator) =>
{
var rabbitSettings = serviceProvider.GetService<IOptions<RabbitSettings>>().Value;
var host = new Uri("rabbitmq://" rabbitSettings.EventBusConnection);
configurator.Host(host, h =>
{
h.Username(rabbitSettings.EventBusUserName);
h.Password(rabbitSettings.EventBusPassword);
});
if (consumerTypes.Any())
{
// rabbitSettings.QueueName => service-b
configurator.ReceiveEndpoint(rabbitSettings.QueueName, e =>
{
e.UseConsumeFilter(typeof(InboxFilter<>), context);
e.ConfigureConsumers(context);
});
}
});
});
return services;
}