Home > Mobile >  Spring Integration - Poller does not process next message on MessageHandlingException from JobLaunch
Spring Integration - Poller does not process next message on MessageHandlingException from JobLaunch

Time:08-17

Looking for suggestions on error handling in my Spring Integration/Spring Batch application.

Context

I am using an inbound file adapter to poll for files in the input directory. The file is passed as a parameter to a Spring Batch job via the JobLaunchingGateway. The job is run on its own thread pool using a TaskExecutor.

The files are moved to a processed directory when the job completes or to the error directory when the job execution fails.

The poller uses the AcceptOnceFileListFilter which does not retain processed files between application restarts due to the cache being in-memory. This is not a concern for my application since Spring Batch would throw an exception if the same file shows up in the input directory at a later point in time.

Issue

Day 1 - The input file abc_success005.csv is processed successfully.

Several days later (application has been restarted since Day 1) - When the same file arrived at the input directory erroneously, I noticed that the poller thread received an error from the JobLaunchingGateway and it did not proceed to process the subsequent messages in the current polling cycle.

Note that the poller resumed duties and continued operations on schedule after the polling interval expired.

The SimpleJobLauncher executes the job in a separate thread pool using the TaskExecutor. However, the creation of the job execution uses the caller's thread, which is the poller in my case.

SimpleJobLauncher - From Spring batch source code

jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);

    try {
        taskExecutor.execute(new Runnable() {

            @Override
            public void run() {
                try {
                    if (logger.isInfoEnabled()) {
                        logger.info("Job: ["   job   "] launched with the following parameters: ["   jobParameters
                                  "]");
                    }
                    job.execute(jobExecution);

My Integration flow

@Bean
public IntegrationFlow myIntegrationFlow(JobLaunchingGateway jobLaunchingGateway,
                                             FileMessageToJobRequest fileMessageToJobRequest) {
    return IntegrationFlows.from(Files.inboundAdapter(new File(properties.getInputDir()))
                    .filter(new AcceptOnceFileListFilter<>()),
            c -> c.poller(Pollers.fixedRate(300, TimeUnit.SECONDS)
                            .taskExecutor(taskExecutor())
                            .maxMessagesPerPoll(50)
                    ))
            .transform(fileMessageToJobRequest)
            .handle(jobLaunchingGateway)
            .log(LoggingHandler.Level.WARN, "headers.id   ': '   payload")
            .get();
}

Stack Trace

[ERROR] 2022-08-15 11:59:46,853 o.s.i.h.LoggingHandler error taskExecutor-2id taskExecutor-2 - org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={input_file_name=/var/tmp/batch/input/abc_success005.csv}.  If you want to run this job again, change the parameters., failedMessage=GenericMessage [payload=JobLaunchRequest: processStuffJob, parameters={input_file_name=/var/tmp/batch/input/abc_success005.csv}, headers={file_originalFile=/var/tmp/batch/input/abc_success005.csv, id=298019a3-b548-2a26-1469-78394758d824, file_name=abc_success005.csv, file_relativePath=abc_success005.csv, timestamp=1660579186811}]
    at org.springframework.batch.integration.launch.JobLaunchingGateway.handleRequestMessage(JobLaunchingGateway.java:78)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:457)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:325)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:268)
    at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:232)
    at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:142)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:196)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.messageReceived(AbstractPollingEndpoint.java:475)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:461)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.pollForMessage(AbstractPollingEndpoint.java:413)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.lambda$createPoller$4(AbstractPollingEndpoint.java:348)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.lambda$execute$0(ErrorHandlingTaskExecutor.java:57)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException: A job instance already exists and is complete for parameters={input_file_name=/var/tmp/batch/input/abc_success005.csv}.  If you want to run this job again, change the parameters.
at org.springframework.batch.core.repository.support.SimpleJobRepository.createJobExecution(SimpleJobRepository.java:139)

The poller thread typically fetches a bunch of files/messages and sends it to the JobLaunchingGateway for processing. When the job fails to launch, the poller does not handle the exception and hence the processing of the rest of the file terminates.

I understand that this doesn't stop the poller from resuming next cycle but it is not desirable for one bad file to end the current polling cycle. This can also lead to situations where one bad file (when left as is) as potentially cause the issue in repeat in several following polling cycles.

Form Spring Integration source code - AbstractPollingEndpoint

    private Runnable createPoller() {
    return () ->
            this.taskExecutor.execute(() -> {
                int count = 0;
                while (this.initialized && (this.maxMessagesPerPoll <= 0 || count < this.maxMessagesPerPoll)) {
                    if (this.maxMessagesPerPoll == 0) {
                        logger.info("Polling disabled while 'maxMessagesPerPoll == 0'");
                        break;
                    }
                    if (pollForMessage() == null) {
                        break;
                    }
                    count  ;
                }
            });
}

What is the recommended solution in this case?

One option I can think of is to extend the JobLaunchingGateway and deal with the file (move to error directory) log/swallow this exception. From a functional standpoint, there is not much to be done here other than moving the file out log the error.

I am writing this to seek better solutions. I think it is not desirable for one bad file to starve the execution of other files (even within a polling cycle)

CodePudding user response:

The poller thread typically fetches a bunch of files/messages and sends it to the JobLaunchingGateway for processing.

That's not correct. See FileReadingMessageSource. It does fetch on a first request:

protected AbstractIntegrationMessageBuilder<File> doReceive() {
    // rescan only if needed or explicitly configured
    if (this.scanEachPoll || this.toBeReceived.isEmpty()) {
        scanInputDirectory();
    }

    File file = this.toBeReceived.poll();

And we produce only one from that queue to the mentioned if (pollForMessage() == null) {

That's true that current polling cycle is cancelled in case of error, but we don't ignore the other cached files. We just comes back to the next this.toBeReceived.poll() in the next polling cycle.

It just was designed that way day first: the maxMessagesPerPoll is a part of single polling cycle unit of work. As well as all the messages we poll from a single executor's task. So, failure somewhere in the middle of task is like a task cancelling.

If you still find this as an inappropriate, see if maxMessagesPerPoll = 1 is OK for you, so you single polling cycle is going to be for a single file.

Another solution is to catch an exception on that .handle(jobLaunchingGateway) and don't let it pass back to the poller. See an ExpressionEvaluatingRequestHandlerAdvice: https://docs.spring.io/spring-integration/docs/current/reference/html/messaging-endpoints.html#message-handler-advice-chain

  • Related