Home > Software engineering >  MassTransit messages types must not be System types exception
MassTransit messages types must not be System types exception

Time:10-27

I'm pretty new to MassTransit and don't understand what am I doing wrong to get the following exception: Messages types must not be System types.

Here are my definitions:

[BsonIgnoreExtraElements]
public class ArcProcess : SagaStateMachineInstance, ISagaVersion
{
    public Guid CorrelationId { get; set; }

    public string CurrentState { get; set; }

    public int Version { get; set; }

    public Guid ActivationId { get; set; }
}

public static class MessageContracts
{
    static bool _initialized;

    public static void Initialize()
    {
        if (_initialized)
            return;

        GlobalTopology.Send.UseCorrelationId<StartProcessingMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<ReconstructionFinishedMessage>(x => x.ActivationId);
        GlobalTopology.Send.UseCorrelationId<ProcessingFinishedMessage>(x => x.ActivationId);

        _initialized = true;
    }
}

2 of my consumers are:

public class StartReconstructionConsumer : IConsumer<StartProcessingMessage>
{
    readonly ILogger<StartReconstructionConsumer> _Logger;

    private readonly int _DelaySeconds = 5;

    public StartReconstructionConsumer(ILogger<StartReconstructionConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<StartProcessingMessage> context)
    {
        var activationId = context.Message.ActivationId;

        _Logger.LogInformation($"Received Scan: {activationId}");

        await Task.Delay(_DelaySeconds * 1000);

        _Logger.LogInformation($"Finish Scan: {activationId}");

        await context.Publish<ReconstructionFinishedMessage>(new { ActivationId = activationId });
    }
}

public class ProcessingFinishedConsumer : IConsumer<ProcessingFinishedMessage>
{
    readonly ILogger<ProcessingFinishedConsumer> _Logger;

    public ProcessingFinishedConsumer(ILogger<ProcessingFinishedConsumer> logger)
    {
        _Logger = logger;
    }

    public async Task Consume(ConsumeContext<ProcessingFinishedMessage> context)
    {
        _Logger.LogInformation($"Finish {context.Message.ActivationId}");

        await Task.CompletedTask;
    }
}

And here is the StateMachine definition:

public class ArcStateMachine: MassTransitStateMachine<ArcProcess>
{
    static ArcStateMachine()
    {
        MessageContracts.Initialize();
    }

    public ArcStateMachine()
    {
        InstanceState(x => x.CurrentState);

        Initially(
            When(ProcessingStartedEvent)
            .Then(context =>
            {
                Console.WriteLine(">> ProcessingStartedEvent");
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .TransitionTo(ProcessingStartedState));

        During(ProcessingStartedState,
            When(ReconstructionFinishedEvent)
            .Then(context =>
            {
                Console.WriteLine(">> ReconstructionFinishedEvent");
                context.Instance.ActivationId = context.Data.ActivationId;
            })
            .Publish(context =>
            {
                return context.Init<ProcessingFinishedMessage>(new { ActivationId = context.Data.ActivationId });
            })
            .TransitionTo(ProcessingFinishedState)
            .Finalize());
    }

    public State ProcessingStartedState { get; }
    public State ReconstructionStartedState { get; }
    public State ReconstructionFinishedState { get; }
    public State ProcessingFinishedState { get; }

    public Event<StartProcessingMessage> ProcessingStartedEvent { get; }
    public Event<ReconstructionStartedMessage> ReconstructionStartedEvent { get; }
    public Event<ReconstructionFinishedMessage> ReconstructionFinishedEvent { get; }
    public Event<ProcessingFinishedMessage> ProcessingFinishedEvent { get; }
}

And the setup for MassTransit looks the following:

var rabbitHost = Configuration["RABBIT_MQ_HOST"];

        if (rabbitHost.IsNotEmpty())
        {
            services.AddMassTransit(cnf =>
            {
                var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];

                var machine = new ArcStateMachine();
                var repository = MongoDbSagaRepository<ArcProcess>.Create(connectionString,
                    "mongoRepo", "WorkflowState");

                cnf.AddConsumer(typeof(StartReconstructionConsumer));
                cnf.AddConsumer(typeof(ProcessingFinishedConsumer));

                cnf.UsingRabbitMq((context, cfg) =>
                {
                    cfg.Host(new Uri(rabbitHost), hst =>
                    {
                        hst.Username("guest");
                        hst.Password("guest");
                    });

                    cfg.ConfigureEndpoints(context);

                    cfg.ReceiveEndpoint(BusConstants.SagaQueue,
                        e => e.StateMachineSaga(machine, repository));
                });
            });

            services.AddMassTransitHostedService();

            services.AddSwaggerGen(c =>
            {
                c.SwaggerDoc("v1", new OpenApiInfo { Title = "MyApp", Version = "v1" });
            });
        }

I have several questions about it:

  1. When actually the event is published as a result of publishing a message? I.e. in my example await _BusInstance.Bus.Publish<StartProcessingMessage>(new { ActivationId = id }); is called from a WebApi which is consumed by StartReconstructionConsumer but when actually the state machine starts to act with Initially(When(ProcessingStartedEvent)...?

  2. My processing should ensure I'm already in the ProcessingStartedState state in order to During(ProcessingStartedState, When(ReconstructionFinishedEvent)... to act correctly. So how do I ensure that my consumer that fires upon receive of StartProcessingMessage can publish the ReconstructionFinishedMessage that should initiate that During? Am I building the messages exchange correctly?

  3. Currently for the await context.Publish<ReconstructionFinishedMessage>(new { ActivationId = activationId }); I get an exception in the logs that states R-FAULT rabbitmq://localhost/saga.service d4070000-7b3b-704d-0f10-08d99942c959 Nanox.GC.Shared.AppCore.Messages.ReconstructionFinishedMessage ReconCaller.Saga.ArcProcess(00:00:04.1132604) while that guid in the message is actually the MessageId. And my message in the rabbitmq is routed to saga.service_error with an exception Messages types must not be System types: System.Threading.Tasks.Task<Nanox.GC.Shared.AppCore.Messages.ProcessingFinishedMessage> (Parameter 'T').

It seems like I'm missing here really big..

My intent is to initiate processing that will have several stages processed by a few consumers sequentially. So here I tried to build a simple StateMachine that starts whenever someone called StartProcessing, then each consumer will do its job and fire the FinishedStepX which will promote the state machine to a new step and initiate the next consumer up until all the processing is done and the state machine will report ProcessingComplete.

Thanks for any help n advance

CodePudding user response:

First, your bus configuration is a bit strange, so I've cleaned that up:

services.AddMassTransit(cnf =>
{
    var connectionString = Configuration["MONGO_DB_CONNECTION_STRING"];

    cfg.AddSagaStateMachine<ArcStateMachine, ArcProcess>()
        .Endpoint(e => e.Name = BusConstants.SagaQueue)
        .MongoDbRepository(connectionString, r =>
        {
            r.DatabaseName = "mongoRepo";
            r.CollectionName = "WorkflowState";
        });

    cnf.AddConsumer<StartReconstructionConsumer>();
    cnf.AddConsumer<ProcessingFinishedConsumer>();

    cnf.UsingRabbitMq((context, cfg) =>
    {
        cfg.Host(new Uri(rabbitHost), hst =>
        {
            hst.Username("guest");
            hst.Password("guest");
        });

        cfg.ConfigureEndpoints(context);
    });
});

And the publish issue is related to the method being used, only PublishAsync allows the use of message initializers:

During(ProcessingStartedState,
    When(ReconstructionFinishedEvent)
        .Then(context =>
        {
            Console.WriteLine(">> ReconstructionFinishedEvent");
            context.Instance.ActivationId = context.Data.ActivationId;
        })
        .PublishAsync(context =>
        {
            return context.Init<ProcessingFinishedMessage>(new { ActivationId = context.Data.ActivationId });
        })
        .TransitionTo(ProcessingFinishedState)
        .Finalize());

That should sort you out.

  • Related