I am trying to connect two services using RabbitMQ queue. First services pushes value to queue, second retrieves it and process. All is fine, but when second service tries to process job, it throws exception. Queue item stays in JobAttempt queue without any information, and consumer service retries to process job, but throws same exception every time.
Exception
fail: MassTransit.ReceiveTransport[0]
S-FAULT rabbitmq://localhost/JobAttempt f0cb0000-1616-902e-edb0-08d97cd26cf9 MassTransit.Contracts.JobService.JobStatusCheckRequested
fail: MassTransit.ReceiveTransport[0]
T-FAULT rabbitmq://localhost/JobAttempt f0cb0000-1616-902e-8129-08d97cca994f
System.Threading.Tasks.TaskCanceledException: A task was canceled.
at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContext`2.Delete(SagaConsumeContext`1 context)
at MassTransit.Saga.MissingSagaPipe`2.Send(SagaConsumeContext`2 context)
at MassTransit.Saga.MissingSagaPipe`2.Send(SagaConsumeContext`2 context)
at MassTransit.Saga.SendSagaPipe`2.Send(SagaRepositoryContext`2 context)
at MassTransit.Saga.InMemoryRepository.InMemorySagaRepositoryContextFactory`1.Send[T](ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Saga.Pipeline.Filters.CorrelatedSagaFilter`2.GreenPipes.IFilter<MassTransit.ConsumeContext<TMessage>>.Send(ConsumeContext`1 context, IPipe`1 next)
at MassTransit.Pipeline.Filters.InMemoryOutboxFilter`2.Send(TContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.InMemoryOutboxFilter`2.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.RetryFilter`1.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at GreenPipes.Partitioning.Partition.Send[T](T context, IPipe`1 next)
at GreenPipes.Filters.TeeFilter`1.<>c__DisplayClass5_0.<<Send>g__SendAsync|1>d.MoveNext()
--- End of stack trace from previous location ---
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next, TOutput pipeContext)
at GreenPipes.Filters.OutputPipeFilter`2.SendToOutput(IPipe`1 next, TOutput pipeContext)
at GreenPipes.Filters.DynamicFilter`1.<>c__DisplayClass10_0.<<Send>g__SendAsync|0>d.MoveNext()
--- End of stack trace from previous location ---
at MassTransit.Pipeline.Filters.DeserializeFilter.Send(ReceiveContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at GreenPipes.Filters.RescueFilter`2.GreenPipes.IFilter<TContext>.Send(TContext context, IPipe`1 next)
at MassTransit.Pipeline.Filters.DeadLetterFilter.GreenPipes.IFilter<MassTransit.ReceiveContext>.Send(ReceiveContext context, IPipe`1 next)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.Transports.ReceivePipeDispatcher.Dispatch(ReceiveContext context, ReceiveLockContext receiveLock)
at MassTransit.RabbitMqTransport.Pipeline.RabbitMqBasicConsumer.<>c__DisplayClass24_0.<<HandleBasicDeliver>b__0>d.MoveNext()
Producer startup:
services.AddMassTransit(x =>
{
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.ConfigureEndpoints(context);
});
});
services.AddMassTransitHostedService();
Consumer startup:
services.AddMassTransit(x =>
{
x.AddDelayedMessageScheduler();
x.AddConsumer<LoanRequestJobConsumer>(cfg =>
{
cfg.Options<JobOptions<LoanRequestBroker>>(options =>
{
options.SetJobTimeout(TimeSpan.FromMinutes(5));
options.SetConcurrentJobLimit(10);
});
});
x.SetKebabCaseEndpointNameFormatter();
x.UsingRabbitMq((context, cfg) =>
{
cfg.UseDelayedMessageScheduler();
cfg.ServiceInstance(instance =>
{
instance.ConfigureJobServiceEndpoints(js =>
{
js.SagaPartitionCount = 1;
js.FinalizeCompleted = true;
});
cfg.ReceiveEndpoint("loan-request-processing", e =>
{
e.ConfigureConsumer<LoanRequestJobConsumer>(context);
});
instance.ConfigureEndpoints(context);
});
});
});
services.AddMassTransitHostedService();
Job consumer
public class LoanRequestJobConsumer : IJobConsumer<LoanRequestBroker>
{
private readonly ILogger<LoanRequestJobConsumer> _logger;
private readonly ILoanProcessingService _processingService;
public LoanRequestJobConsumer(
ILogger<LoanRequestJobConsumer> logger,
ILoanProcessingService processingService)
{
_logger = logger;
_processingService = processingService;
}
public async Task Run(JobContext<LoanRequestBroker> context)
{
_logger.LogInformation($"{nameof(LoanRequestJobConsumer)}: start processing loan request id = {context.Job.Id}");
var processingInfo = new LoanProcessingInfo
{
Status = TaskStatus.InProgress,
LoanRequest = context.Job.Adapt<LoanRequest>()
};
processingInfo = await _processingService.SaveProcessingInfoAsync(processingInfo);
processingInfo = await _processingService.ProcessAsync(processingInfo);
processingInfo = await _processingService.SaveProcessingInfoAsync(processingInfo);
_logger.LogInformation($"{nameof(LoanRequestJobConsumer)}: end processing loan request id = {context.Job.Id}"
$"\nResult: {JsonConvert.SerializeObject(processingInfo)}");
}
}
How I push item to the queue
var endpoint = await _sendEndpointProvider.GetSendEndpoint(_brokerEndpoints.LoanProcessingQueue);
await endpoint.Send(loanRequest.Adapt<LoanRequestBroker>());
CodePudding user response:
If I had to guess, without any other error log details, I would think that the delayed exchange plug-in is not installed/enabled on RabbitMQ.