Home > Enterprise >  How to avoid keeping message in endpoint queue when MassTransit State Machine Requests returns Reque
How to avoid keeping message in endpoint queue when MassTransit State Machine Requests returns Reque

Time:09-03

I have a specific scenario in my app , that is when a new user create new Account i need to create a new workspace for him, also in my scenario I have to wait for a workspace to be created to return success to the client saying that his account is created and his workspace is ready

so i used MassTransit Request Method to achieve what I need and its working perfectly as you can see here i am starting my state machine with this request

var response = await _prepareAdvertiserAccountRequest.GetResponse<AdvertiserAccountPreparedIE, AdvertiserAccountPreparationFailedIE>(new PrepareAdvertiserAccountIE
            {
                Owner = notification.Owner,
                FirstName = notification.FirstName,
                LastName = notification.LastName,
            });

so here is my state machine i am saving the ResponseAddress then the request sending the message and everything letterly working and the request is waiting for the workspace creating then ending the request

using MassTransit;
using EventBus.Messages.Events.Identity.AdvertiserSignUpSagaEvents;

namespace Auth.API.StateMachines.AdvertiserAccount
{
    public class AdvertiserAccountStateMachine :
        MassTransitStateMachine<AdvertiserAccountState>
    {
        public AdvertiserAccountStateMachine()
        {
            InstanceState(x => x.CurrentState);

            Event(() => PrepareAdvertiserAccount, x => x.CorrelateById(m => m.Message.Owner));

            Request(() => CreateWorkspaceRequested, x => x.CreateWorkspaceRequestId, x =>
            {
                x.ServiceAddress = new Uri($"queue:create-workspace");
                x.Timeout = TimeSpan.FromSeconds(20);
            });


            Initially(
                When(PrepareAdvertiserAccount)
                .Then(context =>
                {
                    context.Saga.CreateWorkspaceRequestId = Guid.NewGuid();

                    context.Saga.AccountId = context.Message.Owner;
                    context.Saga.FirstName = context.Message.FirstName;
                    context.Saga.LastName = context.Message.LastName;

                    context.Saga.ResponseAddress = context.ResponseAddress!;
                    context.Saga.RequestId = context.RequestId;
                })
                .Request(CreateWorkspaceRequested, x => x.Init<CreateWorkspaceIE>(new CreateWorkspaceIE
                {
                    Owner = x.Saga.AccountId,
                    FirstName = x.Saga.FirstName,
                    LastName = x.Saga.LastName,
                }))
                .TransitionTo(CreateWorkspaceRequested.Pending));

            During(CreateWorkspaceRequested.Pending,
                When(CreateWorkspaceRequested.Completed)
                    .Then(context =>
                    {
                        context.Saga.WorkspaceId = context.Message.WorkspaceId;
                    })
                    .TransitionTo(PreparationCompleted),
                When(CreateWorkspaceRequested.Faulted)
                    .TransitionTo(PreparationFailed),
                When(CreateWorkspaceRequested.TimeoutExpired)
                    .TransitionTo(PreparationFailed));

        

            WhenEnter(PreparationFailed, x => x.ThenAsync(async context =>
            {
                var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                await endpoint.Send(new AdvertiserAccountPreparationFailedIE(), r => r.RequestId = context.Saga.RequestId);
            }));
            WhenEnter(PreparationCompleted, x => x.ThenAsync(async context =>
            {
                var endpoint = await context.GetSendEndpoint(context.Saga.ResponseAddress);
                await endpoint.Send(new AdvertiserAccountPreparedIE(), r => r.RequestId = context.Saga.RequestId);
            }).Finalize());

            SetCompletedWhenFinalized();
        }


        public State PreparationFailed { get; set; }
        public State PreparationCompleted { get; set; }

        public State WorkspaceCreated { get; set; }


        public Event<PrepareAdvertiserAccountIE> PrepareAdvertiserAccount { get; set; }
        public Request<AdvertiserAccountState, CreateWorkspaceIE, WorkspaceCreatedIE> CreateWorkspaceRequested { get; set; }
    }
}

But my problem is , when the Workspace Service Down (create-workspace) queue will not be consumed so the request will return CreateWorkspaceRequested.TimeoutExpired and then the state machine will end the request and return AdvertiserAccountPreparationFailedIE event to the Request Client (the one that we started the stated machine with )

but after CreateWorkspaceRequested.TimeoutExpired the queue:create-workspace (the serviceAddress that State Machine Request Sending to ) is keeping the message with status ready to be consumed when workspace service avalible, i don't need this to be happening , because when CreateWorkspaceRequested.TimeoutExpired is returned i am going to rollback creating the account so no need to create workspace any more

enter image description here

CodePudding user response:

If you change your request initializer to:

.Request(CreateWorkspaceRequested, x => x.Init<CreateWorkspaceIE>(new
{
    Owner = x.Saga.AccountId,
    FirstName = x.Saga.FirstName,
    LastName = x.Saga.LastName,
    __TimeToLive = TimeSpan.FromSeconds(30)
}))

The TimeToLive header will be set on the outgoing message.

  • Related