Home > Software design >  MassTransit The Step1FinishedEvent Event event is not handled during the ProcessingStartedState stat
MassTransit The Step1FinishedEvent Event event is not handled during the ProcessingStartedState stat

Time:10-31

I'm trying to make a fully working example with MassTransit state machine to orchestrate fully decoupled services and running into an exception: The Step1FinishedEvent Event event is not handled during the ProcessingStartedState state for the ArcStateMachine state machine error. During the debug session it seems like the messages (that are consumed by consumers) fire events that are dealt by the state machine too late.

My definitions:

// StateMachine processing instance definition
[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
    public Guid CorrelationId { get; set; }

    public string CurrentState { get; set; }

    public int Version { get; set; }

    public Guid ActivationId { get; set; }
}

// Contracts correlations definitions for message exchange of the state machine
public static class MessageContracts
{
    static bool _initialized;

    public static void Initialize()
    {
        if (_initialized)
            return;

        GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<StartStep1Message>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<Step1FinishedMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<StartStep2Message>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<Step2FinishedMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);

        _initialized = true;
    }
}

// Step1CounsumerDefinition to avoid fault messages be routed to _error RMQ queue for Step1Consumer
public class Step1ConsumerDefinition : ConsumerDefinition<Step1Consumer>
{
    public Step1ConsumerDefinition()
    {
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<Step1Consumer> consumerConfigurator)
    {
        endpointConfigurator.DiscardFaultedMessages();
    }
}

My 2 actual processing consumers:

public class Step1Consumer : IConsumer<StartStep1Message>
{
    readonly ILogger<Step1Consumer> _Logger;

    private readonly int _DelaySeconds = 5;

    public Step1Consumer(ILogger<Step1Consumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartStep1Message> context)
    {
        var activationId = context.Message.ActivationId;

        _Logger.LogInformation($"Step 1 started: {activationId}");

        await Task.Delay(_DelaySeconds * 1000);

        _Logger.LogInformation($"Step 2 finished: {activationId}");

        await context.Publish<Step1FinishedMessage>(new { ActivationId = activationId });
    }
}

public class Step2Consumer : IConsumer<StartStep2Message>
{
    readonly ILogger<Step2Consumer> _Logger;

    private readonly int _DelaySeconds = 1;

    public Step2Consumer(ILogger<Step2Consumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartStep2Message> context)
    {
        var activationId = context.Message.ActivationId;

        _Logger.LogInformation($"Step 2 started {activationId}");

        await Task.Delay(_DelaySeconds * 1000);

        _Logger.LogInformation($"Step 2 finished {activationId}");

        await context.Publish<Step2FinishedMessage>(new { ActivationId = activationId });
    }
}

I have also 2 auxiliary consumers to orchestrate transitions of messages between different services to decouple those and to detect finish of all the processing:

public class TransitionConsumer : 
    IConsumer<StartProcessingMessage>, 
    IConsumer<Step1FinishedMessage>, 
    IConsumer<Step2FinishedMessage>
{
    readonly ILogger<TransitionConsumer> _Logger;

    private readonly int _DelaySeconds = 5;

    public TransitionConsumer(
        ILogger<TransitionConsumer> logger
        )
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartProcessingMessage> context)
    {
        var activationId = context.Message.ActivationId;
        _Logger.LogInformation($"Transition from Started to Step 1: {activationId}");
        await context.Publish<StartStep1Message>(new { ActivationId = activationId });
    }

    public async Task Consume(ConsumeContext<Step1FinishedMessage> context)
    {
        var activationId = context.Message.ActivationId;
        _Logger.LogInformation($"Transition from Step 1 to Step 2: {activationId}");
        await context.Publish<StartStep2Message>(new { ActivationId = activationId });
    }

    public async Task Consume(ConsumeContext<Step2FinishedMessage> context)
    {
        var activationId = context.Message.ActivationId;
        _Logger.LogInformation($"Transition from Step 2 to Completion: {activationId}");
        await context.Publish<ProcessingFinishedMessage>(new { ActivationId = activationId });
    }
}

public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
    readonly ILogger<ProcessingFinishedConsumer> _Logger;

    public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
    {
        _Logger.LogInformation($"Finish {context.Message.ActivationId}");

        await Task.CompletedTask;
    }
}

And a Fault<> consumer that deals with all faults that may come from Step1Consumer and Step2Consumer:

public class FaultConsumer : 
    IConsumer<Fault<StartStep1Message>>, 
    IConsumer<Fault<StartStep2Message>>
{
    readonly ILogger<FaultConsumer> _Logger;

    public FaultConsumer(ILogger<FaultConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<Fault<StartStep1Message>> context)
    {
        await LogError("Step 1", context.Message.Message.ActivationId, context.Message.Exceptions);
    }

    public async Task Consume(ConsumeContext<Fault<StartStep2Message>> context)
    {
        await LogError("Step 2", context.Message.Message.ActivationId, context.Message.Exceptions);
    }

    private async Task LogError(string step, Guid activationId, ExceptionInfo[] exceptions)
    {
        var errorMessages = string.Join(", ", exceptions.Select(e => e.Message));
        _Logger.LogInformation($"{step} failed for {activationId}, cause: {errorMessages}");
    }
}

Here's the state machine definition:

public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
    static ArcStateMachine()
    {
        MessageContracts.Initialize();
    }

    public ArcStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Initially(
            When(StartProcessingEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(ProcessingStartedState));

        During(ProcessingStartedState,
            When(Step1StartedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(Step1StartedState));

        During(Step1StartedState,
            When(Step1FinishedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(Step1FinishedState));

        During(Step1FinishedState,
            When(Step2StartedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(Step2StartedState));

        During(Step2StartedState,
            When(Step2FinishedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(Step2FinishedState));

        During(Step2FinishedState,
            When(ProcessingFinishedEvent)
            .Then(context =>
            {
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .Finalize());
    }

    public State ProcessingStartedState { get; }
    public State Step1StartedState { get; }
    public State Step1FinishedState { get; }
    public State Step2StartedState { get; }
    public State Step2FinishedState { get; }

    public Event<StartProcessingMessage> StartProcessingEvent { get; }
    public Event<StartStep1Message> Step1StartedEvent { get; }
    public Event<Step1FinishedMessage> Step1FinishedEvent { get; }
    public Event<StartStep2Message> Step2StartedEvent { get; }
    public Event<Step2FinishedMessage> Step2FinishedEvent { get; }
    public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}

And the setup of MassTransit:

        var rabbitHost = Configuration["RABBIT_MQ_HOST"];

        if (rabbitHost.IsNotEmpty())
        {
            services.AddMassTransit(cnf =>
            {
                var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];

                cnf.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
                    .Endpoint(e => e.Name = BusConstants.SagaQueue)
                    .MongoDbRepository(connectionString, r =>
                    {
                        r.DatabaseName = "mongo";
                        r.CollectionName = "WorkflowState";
                    });

                cnf.AddConsumer(typeof(TransitionConsumer));
                cnf.AddConsumer(typeof(Step1Consumer), typeof(Step1ConsumerDefinition));
                cnf.AddConsumer(typeof(Step2Consumer));
                cnf.AddConsumer(typeof(ProcessingFinishedConsumer));
                cnf.AddConsumer(typeof(FaultConsumer));

                //cnf.AddMessageScheduler(schedulerEndpoint);

                cnf.UsingRabbitMq((context, cfg) =>
                {
                    cfg.Host(new Uri(rabbitHost), hst =>
                    {
                        hst.Username("guest");
                        hst.Password("guest");
                    });

                    //cfg.UseMessageScheduler(schedulerEndpoint);

                    cfg.ConfigureEndpoints(context);
                });
            });

            services.AddMassTransitHostedService();

            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
            });
        }

So actually the message flow should be as follows:

  1. A controller publishes the StartProcessingMessage which is consumed by TransitionConsumer which in turn publishes the StartStep1Message.
  2. Step1Consumer gets the message, does its job and publishes the Step1FinishedMessage.
  3. TransitionConsumer gets the message and publishes the StartStep2Message.
  4. Step2Consumer gets the message, does its job and publishes the Step2FinishedMessage.
  5. TransitionConsumer gets the message and publishes the ProcessingFinishedMessage which is consumed by the ProcessingFinishedConsumer.

In this scenario both Step1Consumer and Step2Consumer do not know about existence of the other and the only responsibility for transition between steps is orchestrated by the TransitionConsumer. All this is done while the state machine tracks each and every message and passes through all respectful states.

The problem arises right in the beginning since the TransitionConsumer publishes the StartStep1Message before the ArcStateMachine starts to handle the StartProcessingEvent which I thought will be fired before. All this leads to a situation in which the state machine gets stuck in ProcessingStartedState. And the outcome is that on publish of Step1FinishedEvent the machine is not in the Step1StartedState which had to be so due to StartStep1Message message that should have fire the Step1StartedEvent.

How can I solve this issue?

CodePudding user response:

You should create a Saga Definition for your state machine, so that you can configure message retry and the in-memory outbox.

Within that definition, add the retry/outbox directly to the receive endpoint as shown below.

endpointConfigurator.UseMessageRetry(r => r.Interval(3,1000));
endpointConfigurator.UseInMemoryOutbox();

This should deal with any concurrency issues in the saga (the consumer is likely receiving the message, producing the event, and that event is being dispatched to the saga before the saga is finished processing the event that triggered the command to the consumer. Yes, it's that quick.

  • Related