Home > Software design >  How to Schedule in the Initial state of MassTransit saga
How to Schedule in the Initial state of MassTransit saga

Time:02-25

I created a state machine saga that will receive multiple messages and only after a given time period elapses, I want it to continue its work. I figured the only way to do it with mass transit is to go with the scheduling capabilities of the framework.

The saga code (shortened for brevity) is given below:

public class CheckFeedSubmissionStateMachine : MassTransitStateMachine<CheckFeedSubmissionState>
{
    public State? WaitingForTimeoutExpiration { get; private set; }

    public State? FetchingSubmissionData { get; private set; }

    public Event<CheckFeedSubmissionCommand> CheckFeedSubmissionCommandReceived { get; private set; }

    public Event<FeedSubmissionListReceivedEvent> FeedSubmissionListReceived { get; private set; }

    public Event<FeedSubmissionListErrorReceivedEvent> FeedSubmissionListErrorReceived { get; private set; }

    public Event<FeedSubmissionResultReceivedEvent> FeedSubmissionResultReceived { get; private set; }

    public Event<FeedSubmissionResultErrorReceivedEvent> FeedSubmissionResultErrorReceived { get; private set; }

    public Schedule<CheckFeedSubmissionState, SchedulingCompletionTimeoutExpired> ScheduleCompletionTimeout { get; private set; }

    private readonly int _scheduleDelay;
    
    public CheckFeedSubmissionStateMachine(IOptions<SagasOptions> options)
    {
        _scheduleDelay = int.Parse(options.Value.CheckFeedSubmissionStateMachine["ScheduleDelay"]);

        Configure();
        BuildProcess();
    }

    private void Configure()
    {
        Event(
            () => CheckFeedSubmissionCommandReceived,
            e => e.CorrelateById(x => x.Message.PartnerGuid));
        Schedule(() => ScheduleCompletionTimeout, instance => instance.SchedulingCompletionTimeoutTokenId, s =>
        {
            s.Delay = TimeSpan.FromSeconds(_scheduleDelay);
            s.Received = r => r.CorrelateById(context => context.Message.CorrelationId);
        });
        InstanceState(state => state.CurrentState);
    }

    private void BuildProcess()
    {
        Initially(
            When(CheckFeedSubmissionCommandReceived)
                .Then(InitializeState)
                .Then(StoreSubmissionIds)
                .Schedule(ScheduleCompletionTimeout, ScheduleEvent)
                .TransitionTo(WaitingForTimeoutExpiration));
        
        During(WaitingForTimeoutExpiration,
            When(CheckFeedSubmissionCommandReceived)
                .Then(StoreSubmissionIds),
            When(ScheduleCompletionTimeout.Received)
                .Activity(QueueGetFeedSubmissionListRequest)
                .TransitionTo(FetchingSubmissionData));

        // the rest ommited for brevity
    }

    private void InitializeState(BehaviorContext<CheckFeedSubmissionState, CheckFeedSubmissionCommand> ctx) =>
        ctx.Instance.PartnerId = ctx.Data.PartnerId;

    private void StoreSubmissionIds(BehaviorContext<CheckFeedSubmissionState, CheckFeedSubmissionCommand> ctx)
    {
        ctx.Instance.SubmissionIdToStatusMap[ctx.Data.FeedSubmissionId] = FeedProcessingStatus.Submitted;
        ctx.Instance.SubmissionIdsToCorrelationIdsMap[ctx.Data.FeedSubmissionId] = ctx.Data.CorrelationId;
    }

    private Task<SchedulingCompletionTimeoutExpired> ScheduleEvent<TEvent>(
        ConsumeEventContext<CheckFeedSubmissionState, TEvent> ctx) where TEvent : class =>
        ctx.Init<SchedulingCompletionTimeoutExpired>(new { ctx.Instance.CorrelationId });

    private EventActivityBinder<CheckFeedSubmissionState, SchedulingCompletionTimeoutExpired> QueueGetFeedSubmissionListRequest(
        IStateMachineActivitySelector<CheckFeedSubmissionState, SchedulingCompletionTimeoutExpired> sel) =>
        sel.OfType<QueueGetFeedSubmissionListActivity>();
}

The one test that I created for it aims at checking if both published messages have been preserved in the saga, the code below:

    [Fact]
    public async Task GivenCheckFeedSubmissionCommand_WhenAnotherCheckFeedSubmissionCommandIsReceived_ThenTheSagaStoresBothSubmissionIds()
    {
        var (harness, sagaHarness) = GetTestComponents();
        var partnerGuid = Guid.NewGuid();

        await harness.Start();

        try
        {
            await harness.Bus.Publish(GetInitiatingEvent("1", partnerGuid));
            await Consumption<CheckFeedSubmissionCommand>(harness, sagaHarness, 1);
            await harness.Bus.Publish(GetInitiatingEvent("2", partnerGuid));
            await Consumption<CheckFeedSubmissionCommand>(harness, sagaHarness, 2);
            
            var state = sagaHarness.Sagas.Contains(partnerGuid);

            state.CurrentState.Should().Be("WaitingForTimeoutExpiration");
            state.SubmissionIdsToCorrelationIdsMap.Should().ContainKeys("1", "2");
        }
        finally
        {
            await harness.Stop();
        }
    }

    private static (InMemoryTestHarness, IStateMachineSagaTestHarness<CheckFeedSubmissionState, CheckFeedSubmissionStateMachine>) GetTestComponents() =>
        TestHarnessFactory.Create<CheckFeedSubmissionState, CheckFeedSubmissionStateMachine>(
            sp => sp
                .AddSingleton(Options.Create(new SagasOptions
                {
                    CheckFeedSubmissionStateMachine = new Dictionary<string, string>
                    {
                        ["ScheduleDelay"] = "0"
                    }
                })));

    private static CheckFeedSubmissionCommand GetInitiatingEvent(string feedSubmissionId, Guid partnerGuid) =>
        new(Guid.NewGuid(), "1", partnerGuid, feedSubmissionId);

    private static async Task Consumption<TEvent>(
        InMemoryTestHarness harness,
        IStateMachineSagaTestHarness<CheckFeedSubmissionState, CheckFeedSubmissionStateMachine> sagaHarness,
        int expectedCount)
        where TEvent : class
    {
        if (expectedCount == 1)
        {
            var harnessConsumed = harness.Consumed.SelectAsync<TEvent>().Any();
            var sagaConsumed = sagaHarness.Consumed.SelectAsync<TEvent>().Any();

            await Task.WhenAll(harnessConsumed, sagaConsumed);
        }
        else
        {
            int harnessConsumedCount;
            int sagaConsumedCount;

            do
            {
                var harnessConsumedTask = harness.Consumed.SelectAsync<TEvent>().Count();
                var sagaConsumedTask = sagaHarness.Consumed.SelectAsync<TEvent>().Count();

                harnessConsumedCount = await harnessConsumedTask;
                sagaConsumedCount = await sagaConsumedTask;

                await Task.Delay(1000);
            } while (harnessConsumedCount < expectedCount && sagaConsumedCount < expectedCount);
        }
    }

The problem is that when I invoke this line .Schedule(ScheduleCompletionTimeout, ScheduleEvent) in the Initially/When phase, it somehow interferes with state switching and the saga does not switch to the next state - it stays in the Initial state indefinitely. I confirmed it both by inspecting the state variable in the test and by setting a breakpoint in the InitializeState method - it gets hit twice. When I remove that line doing the scheduling, the test passes, though I can't do that, because I need it. Any help?

CodePudding user response:

It's likely you don't have a scheduler configured for the bus with the test harness. If you had logging enabled for the test, you'd see the error in the logs.

The bus configuration for the test harness should let you add the scheduler:

configurator.UseDelayedMessageScheduler();

There is a configuration event on the test harness, OnConfigureInMemoryBus or something like that, which you can use to configure the bus.

  • Related