Home > Back-end >  Does MassTransit Transactional Outbox Work with Multi Bus scenarios
Does MassTransit Transactional Outbox Work with Multi Bus scenarios

Time:12-04

There is a situation that I have separate CommandBus and EventBus, I have a consumer that listens to the commands in the command bus and after the operation is handled it publishes related events to the Event Bus. I want to have the built-in support of the transactional outbox pattern on the EventBus.

Here is the link to a repo

The following is the configuration of the application:


 public static void ConfigureServices(HostBuilderContext host, IServiceCollection services)
    {
        services.Configure<MessageBrokerConfiguration>(host.Configuration.GetSection("MessageBroker"));
        var brokerConfiguration = new MessageBrokerConfiguration();
        host.Configuration.Bind("MessageBroker", brokerConfiguration);
        
        services.AddHostedService<DatabaseMigratorHostedService>();
        
        services.AddMassTransit<ICommandBus>(mt =>
        {
            mt.UsingRabbitMq((context, configurator) =>
            {
                configurator.Host(brokerConfiguration.CommandBus);
                configurator.ConfigureEndpoints(context);
            });

            mt.AddConsumersFromNamespaceContaining<CreateOrderConsumer>();
        });
        
        services.AddMassTransit(mt =>
        {
            mt.AddEntityFrameworkOutbox<OrderContext>(options =>
            {
                options.QueryDelay = TimeSpan.FromSeconds(1);
                options.UsePostgres();
                options.UseBusOutbox();
            });
            
            mt.UsingRabbitMq((context, configurator) =>
            {
                configurator.Host(brokerConfiguration.EventBus);
                configurator.ConfigureEndpoints(context);
            });
        });

        services.AddRepositories(host.Configuration);
        services.AddScoped<IEventEmitter, MasstransitEventEmitter>();
    }

and the following is my consumer that listens to a command in one bus and publishes an event to another:

public sealed class CreateOrderConsumer
    : IConsumer<CreateOrder>
{
    private readonly IEventEmitter _eventEmitter;
    private readonly IUnitOfWork _unitOfWork;
    private readonly IRepository<Order> _repository;

    public CreateOrderConsumer(
        IRepository<Order> repository,
        IUnitOfWork unitOfWork,
        IEventEmitter eventEmitter)
    {
        _unitOfWork = Guard.Against.Null(unitOfWork);
        _repository = Guard.Against.Null(repository);
        _eventEmitter = Guard.Against.Null(eventEmitter);
    }

    public async Task Consume(ConsumeContext<CreateOrder> context)
    {
        var order = new Order(context.Message.ProductId, context.Message.Quantity);
        
        await _repository.StoreAsync(order);
        
        await _eventEmitter.Emit(order.DomainEvents);
        order.ClearDomainEvents();
        
        await _unitOfWork.CommitAsync();
        await context.RespondAsync<CreateOrderResult>(new { OrderId = order.Id });
    }

and my IEventEmitter is getting a IBus:

public sealed class MasstransitEventEmitter : IEventEmitter
{
    private readonly IPublishEndpoint _publishEndpoint;

    public MasstransitEventEmitter(IBus publishEndpoint)
    {
        _publishEndpoint = Guard.Against.Null(publishEndpoint);
    }
    
    public async Task Emit(IEnumerable<IDomainEvent> domainEvents)
    {
        try
        {
            foreach (var domainEvent in domainEvents)
            {
                await _publishEndpoint.Publish(domainEvent, domainEvent.GetType(), CancellationToken.None);
            }
        }
        catch (Exception)
        {
            // ignored
        }
    }
}

here is the DbContext that is used for the business logic and also transactional outbox configs:

public sealed class OrderContext : DbContext, IUnitOfWork
{
    public OrderContext(DbContextOptions<OrderContext> options) : base(options)
    {
    }

    internal DbSet<OrderEntity> Orders { get; private set; } = default!;

    public async Task CommitAsync(CancellationToken cancellationToken = default)
        => await this.SaveChangesAsync(cancellationToken);


    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        base.OnModelCreating(modelBuilder);
        
        modelBuilder.ApplyConfiguration(new OrderEntityConfiguration());
        
        modelBuilder.AddInboxStateEntity();
        modelBuilder.AddOutboxMessageEntity();
        modelBuilder.AddOutboxStateEntity();
    }
}

There is the API layer that sends commands to the command bus via IRequestClient<CreateOrder> and awaits to get a response back. The problem is that when the Event Bus (not command bus) is down, the transactional outbox is not working, and it continues until a time-out exception is happening.

[HttpPost]
    public async Task<IActionResult> Post(
        [FromBody] CreateOrderDto createOrderDto,
        [FromServices] IRequestClient<CreateOrder> createOrderRequestClient)
    {
        var result = await createOrderRequestClient.GetResponse<CreateOrderResult>(
            new CreateOrder{ ProductId = createOrderDto.ProductId,Quantity= createOrderDto.Quantity }, 
            timeout: RequestTimeout.After(m:2));
        return Ok(result);
    }

And the logs for the API:

info: MassTransit[0]
      Bus started: rabbitmq://localhost/
info: Microsoft.Hosting.Lifetime[14]
      Now listening on: https://localhost:7129
info: Microsoft.Hosting.Lifetime[14]
      Now listening on: http://localhost:5134
info: Microsoft.Hosting.Lifetime[0]
      Application started. Press Ctrl C to shut down.
info: Microsoft.Hosting.Lifetime[0]
      Hosting environment: Development
info: Microsoft.Hosting.Lifetime[0]
      Content root path: /Users/shahab/dev/talks/Demo.TransactionalOutbox/Demo.TransactionalOutbox.Api

Logs for the Application Layer(Listens to the Commands and Publishes Events):

[13:33:45 INF] Configured endpoint CancelOrder, Consumer: Demo.TransactionalOutbox.Application.Consumers.CancelOrderConsumer
[13:33:45 INF] Configured endpoint CreateOrder, Consumer: Demo.TransactionalOutbox.Application.Consumers.CreateOrderConsumer
[13:33:45 INF] Configured endpoint GetOrder, Consumer: Demo.TransactionalOutbox.Application.Consumers.GetOrderConsumer
[13:33:49 DBG] Starting bus instances: ICommandBus, IBus
[13:33:49 DBG] Starting bus: rabbitmq://localhost/
[13:33:49 DBG] Starting bus: rabbitmq://localhost:6666/
[13:33:49 DBG] Connect: guest@localhost:5672/
[13:33:49 DBG] Connect: guest@localhost:6666/
[13:33:49 DBG] Connected: guest@localhost:5672/ (address: amqp://localhost:5672, local: 49955)
[13:33:49 DBG] Connected: guest@localhost:6666/ (address: amqp://localhost:6666, local: 49954)
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost/McShahab_DemoTransactio_bus_5emoyydyan1f7qhobdppkkw6gp?temporary=true
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost:6666/McShahab_DemoTransactio_bus_5emoyydyan1f7jiabdppkkw9bz?temporary=true
[13:33:50 INF] Bus started: rabbitmq://localhost:6666/
[13:33:50 DBG] Declare queue: name: CancelOrder, durable, consumer-count: 0 message-count: 0
[13:33:50 DBG] Declare queue: name: CreateOrder, durable, consumer-count: 0 message-count: 0
[13:33:50 DBG] Declare queue: name: GetOrder, durable, consumer-count: 0 message-count: 0
[13:33:50 DBG] Declare exchange: name: GetOrder, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: CreateOrder, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: CancelOrder, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: Demo.TransactionalOutbox.Domain.OrderAggregate.Queries:GetOrderStatus, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: Demo.TransactionalOutbox.Domain.OrderAggregate.Commands:CancelOrder, type: fanout, durable
[13:33:50 DBG] Declare exchange: name: Demo.TransactionalOutbox.Domain.OrderAggregate.Commands:CreateOrder, type: fanout, durable
[13:33:50 DBG] Bind queue: source: GetOrder, destination: GetOrder
[13:33:50 DBG] Bind queue: source: CancelOrder, destination: CancelOrder
[13:33:50 DBG] Bind queue: source: CreateOrder, destination: CreateOrder
[13:33:50 DBG] Bind exchange: source: Demo.TransactionalOutbox.Domain.OrderAggregate.Commands:CreateOrder, destination: CreateOrder
[13:33:50 DBG] Bind exchange: source: Demo.TransactionalOutbox.Domain.OrderAggregate.Queries:GetOrderStatus, destination: GetOrder
[13:33:50 DBG] Bind exchange: source: Demo.TransactionalOutbox.Domain.OrderAggregate.Commands:CancelOrder, destination: CancelOrder
[13:33:50 DBG] Consumer Ok: rabbitmq://localhost/GetOrder - amq.ctag-jT06Ly0B8--gYF2XxxxyGQ
[13:33:50 DBG] Consumer Ok: rabbitmq://localhost/CreateOrder - amq.ctag-K2-6Gcdxk8z6UPxI0q-xQw
[13:33:50 DBG] Consumer Ok: rabbitmq://localhost/CancelOrder - amq.ctag-YRlkqWCWLKPX1JpCtEThJQ
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost/CreateOrder
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost/GetOrder
[13:33:50 INF] Bus started: rabbitmq://localhost/
[13:33:50 DBG] Endpoint Ready: rabbitmq://localhost/CancelOrder

and the Consumer of the Events:

[13:33:47 INF] Configured endpoint OrderCreated, Consumer: Demo.TransactionalOutbox.FancyConsumer.OrderCreatedConsumer
[13:33:48 DBG] Starting bus instances: IBus
[13:33:48 DBG] Starting bus: rabbitmq://localhost:6666/
[13:33:48 DBG] Connect: guest@localhost:6666/
[13:33:48 DBG] Connected: guest@localhost:6666/ (address: amqp://localhost:6666, local: 49947)
[13:33:48 DBG] Endpoint Ready: rabbitmq://localhost:6666/McShahab_DemoTransactio_bus_hrmoyydyan1fh45qbdppkkiyy5?temporary=true
[13:33:48 DBG] Declare queue: name: OrderCreated, durable, consumer-count: 0 message-count: 0
[13:33:48 DBG] Declare exchange: name: OrderCreated, type: fanout, durable
[13:33:48 DBG] Declare exchange: name: Demo.TransactionalOutbox.Domain.Events:OrderCreated, type: fanout, durable
[13:33:48 DBG] Bind queue: source: OrderCreated, destination: OrderCreated
[13:33:48 DBG] Bind exchange: source: Demo.TransactionalOutbox.Domain.Events:OrderCreated, destination: OrderCreated
[13:33:48 DBG] Consumer Ok: rabbitmq://localhost:6666/OrderCreated - amq.ctag-53c0dDTumv3l33VqwMiSpA
[13:33:48 DBG] Endpoint Ready: rabbitmq://localhost:6666/OrderCreated
[13:33:48 INF] Bus started: rabbitmq://localhost:6666/


In contrast to the sample application for the outbox pattern, I did not see any logs for the Outbox, neither when the rabbitmq is up&running nor when it is down.

CodePudding user response:

Short answer, Transactional Outbox only works with the primary (IBus) bus instance. Any additional bus instances when using MultiBus are unable to use the transactional outbox at this time.

Updated

In your event emitter, you can't use IBus as a publish endpoint because it isn't scoped. But you also can't just use IPublishEndpoint because it would probably be the ConsumeContext from the consumer on the command bus. The underlying wiring to get the transactional outbox isn't really setup to work that way from a consumer on one bus to producing events on another bus.

  • Related