Home > Software design >  MassTransit Saga - return Current State with REST API response
MassTransit Saga - return Current State with REST API response

Time:03-30

I'm using MassTransit Saga State Machine with RabbitMQ transport and EntityFramework repository.

My issue is the following - when a user hits an endpoint in my REST API (.NET 6 Minimal API), as part of my business logic, I publish an event to the State Machine, which changes the State. So, I want to return the current (new) state to the end-user with the response, which isn't possible (or I can't find the solution). Note that my State Machine implementation is in the same REST API project.

So, what I have: Initially, I couldn't find any what to get the Current State from the Storage (in my case SQL Server DB). So I've created an EF method to do that:

public class StateRepository<StateInstance> : IStateRepository<StateInstance> where StateInstance : SqlServerStateInstance
{
    protected readonly DbContext _stateDBContext;

    public StateRepository(DbContext stateDBContext)
    {
        _stateDBContext = stateDBContext;
    }

    public async Task<StateInstance> GetCurrentStateAsync (Guid correlationId)
    {
        return await _stateDBContext.Set<StateInstance>().Where(i => i.CorrelationId == correlationId).FirstOrDefaultAsync();
    }
}

Which works OK... If I have the State already set. :)

So, I added a Middleware, which calls the Repository to get the state by ID and set it in the Response's header:

internal class StateMiddleware
{
    private readonly RequestDelegate _next;

    public StateMiddleware(RequestDelegate next, ILogger<StateMiddleware> logger)
    {
        _next = next;
        _logger = logger;
    }
    
    public async Task Invoke(HttpContext context, IStateHandler stateHandler)
    {
        await _next(context);
        
        string state = await stateHandler.GetCurrentState(correlationID);
        context.Response.Headers.Add("X-State", state ?? "NULL?!?");
    }
}

Which is registered with WebApplication.UseMiddleware<StateMiddleware>();

But when I publish an event (from any endpoint) to update the state, like:

app.MapPost(RoutePrefix   "/sm/call-event/{userId}", async (IPublishEndpoint _publishEndpoint, Guid userId, StateEvents evn) =>
        {
            switch (evn)
            {
                case StateEvents.Initiate:
                    {
                        await _publishEndpoint.Publish<StateInstance>(new
                        {
                            CorrelationId = userId
                        }, default);
                        break;
                    }
                case StateEvents.CollectEmail:
                    {
                        await _publishEndpoint.Publish<CollectEmailData>(new
                        {
                            ActiveAccountExists = false,
                            CorrelationId = userId
                        }, default);
                        break;
                    }
                case StateEvents.ConfirmPass:
                    {
                        await _publishEndpoint.Publish<ConfirmPass>(new
                        {
                            CorrelationId = userId
                        }, default);
                        break;
                    }
            }
            return Results.Ok();
        })

The State is NULL (if I call the endpoint with StateEvents.Initiate) or the previous value (with any other value from the enum).

And here is my Mass Transit configuration:

services.AddMassTransit(cfg=> {
            cfg.AddSagaStateMachine<MyStateMachine, MyStateInstance>()
                .EntityFrameworkRepository(r =>
                {
                    r.ConcurrencyMode = ConcurrencyMode.Optimistic; //requires RowVersion!

                    r.AddDbContext<DbContext, StateRepository>((provider, builder) =>
                    {
                        builder.UseSqlServer(connectionString, m =>
                        {
                            m.MigrationsAssembly(typeof(StateRepository).Assembly.GetName().Name);
                            m.MigrationsHistoryTable($"__{typeof(StateRepository).Name}");
                        });
                    });
                });
            cfg.UsingRabbitMq((context, x) =>
            {
                x.ConfigureEndpoints(context);
                x.Host("localhost",
                    h =>
                    {
                        h.Username("guest");
                        h.Password("guest");
                    }
                );
            });
        });
        services.AddMassTransitHostedService();

I know that State Machine is event-based, which is async, but is there any workaround, or?

Thank you!

CodePudding user response:

You should never access that saga repository directly for that type of information. An instance's current state, and any other data, is owned by the saga. Attempts to read/access the state, or in the worst case modify the state, are misguided and likely to introduce all sorts of inconsistencies.

MassTransit has an easy facility for accessing the state of a saga. You can define a request contract, which is handled by the saga, allowing the saga itself to respond with the current state/details. That request can even be specified as ReadOnly, so that it doesn't modify the state (or attempt to save the unmodified state).

An example shown below has the status check which responds to the request:

class ReadOnlyStateMachine :
    MassTransitStateMachine<ReadOnlyInstance>
{
    public ReadOnlyStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Event(() => StatusCheckRequested, x =>
        {
            x.ReadOnly = true;
        });

        Initially(
            When(Started)
                .Then(context => context.Instance.StatusText = "Started")
                .Respond(context => new StartupComplete {CorrelationId = context.Instance.CorrelationId})
                .TransitionTo(Running)
        );

        During(Running,
            When(StatusCheckRequested)
                .Respond(context => new Status
                {
                    CorrelationId = context.Instance.CorrelationId,
                    StatusText = context.Instance.StatusText
                })
                .Then(context => context.Instance.StatusText = "Running") // this change won't be saved
        );
    }

    public State Running { get; private set; }
    public Event<Start> Started { get; private set; }
    public Event<CheckStatus> StatusCheckRequested { get; private set; }
}

In the controller, simply use the request client to get the status of the state machine:

public class SomeController
{
    public SomeController(IRequestClient<CheckStatus> client)
    {
        _client = client;
    }

    public async Task<IActionResult> DoSomething(Guid id)
    {
        var response = await _client.GetResponse<Status>(new { CorrelationId = id});
    }
}

That should be enough to get you started. If not, here is a controller example, and the corresponding event and handler. (The sample doesn't have the ReadOnly property set on the event, but you should add it).

  • Related