Home > other >  Moving a file from a directory to a success directory or an error directory with Spring Integration
Moving a file from a directory to a success directory or an error directory with Spring Integration

Time:09-07

I am trying to implement a Spring Integration class that takes a .xml file parses it and if it's valid move it to an "archived" directory and in case of invalidity move it to an error directory.


import com.nagarro.studentapi.integration.queue.StudentSender;
import com.nagarro.studentapi.util.XmlParser;
import org.aopalliance.aop.Advice;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.annotation.InboundChannelAdapter;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.core.MessageSource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import org.springframework.integration.file.FileHeaders;
import org.springframework.integration.file.FileReadingMessageSource;
import org.springframework.integration.file.FileWritingMessageHandler;
import org.springframework.integration.file.filters.SimplePatternFileListFilter;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.handler.advice.AbstractRequestHandlerAdvice;
import org.springframework.integration.handler.advice.ExpressionEvaluatingRequestHandlerAdvice;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHandler;
import org.springframework.messaging.MessagingException;

import java.io.File;

@Configuration
@EnableIntegration
public class IntegrationConfiguration {

    private static final String XML = "*.xml";
    private static final String STUDENT = "\\student.xml";

    @Value("${student-api.xmlPath}")
    private String inputPath;
    @Value("${student-api.archivedDestination}")
    private String successPath;
    @Value("${student-api.errorDestination}")
    private String errorPath;

    @Bean
    public MessageChannel messageChannel() {
        return new DirectChannel();
    }

    @Bean
    @InboundChannelAdapter(value = "messageChannel")
    public MessageSource<File> messageProducer() {
        FileReadingMessageSource messageSource = new FileReadingMessageSource();
        messageSource.setDirectory(new File(inputPath));
        messageSource.setFilter(new SimplePatternFileListFilter(XML));
        return messageSource;
    }

    @Bean
    @ServiceActivator(inputChannel = "messageChannel")
    public MessageHandler handler() {
        FileWritingMessageHandler handler = new FileWritingMessageHandler(new File(successPath));
        handler.setFileExistsMode(FileExistsMode.REPLACE);
        handler.setExpectReply(false);
        return handler;
    }

    @Bean
    public IntegrationFlow integrationFlow(XmlParser xmlParser) {
        return IntegrationFlows.from(messageProducer(), spec -> spec.poller(Pollers.fixedDelay(1000)))
                .enrichHeaders(h -> h.headerExpression(FileHeaders.ORIGINAL_FILE, "payload"))
                .convert(String.class)
                .transform((String path) -> xmlParser.parsePath(path))
                .handle("xmlParser", "parsePath", e -> e.advice(errorAdvice()))
                .get();
    }

    @Bean
    public AbstractRequestHandlerAdvice errorAdvice() {
        return new AbstractRequestHandlerAdvice() {

            @Override
            protected Object doInvoke(ExecutionCallback callback, Object target, Message<?> message) {
                File file = message.getHeaders().get(FileHeaders.ORIGINAL_FILE, File.class);
                try {
                    Object result = callback.execute();
                    file.renameTo(new File(successPath, STUDENT));
                    System.out.println("File renamed after success");
                    return result;
                }
                catch (Exception e) {
                    file.renameTo(new File(errorPath, STUDENT));
                    System.out.println("File renamed after failure");
                    throw e;
                }
            }
        };
    }
}

However whenever calback.execute() it's called I get this error and I don't quite understand why.

2022-09-06 18:20:07.971 ERROR 32152 --- [   scheduling-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@1135e3d6]; nested exception is java.lang.IllegalArgumentException: No candidate methods found for messages., failedMessage=GenericMessage [payload=Student(firstname=John, lastname=Dose, cnp=123, birthDate=2000-12-12, address=Address(street=a, number=1, city=Craiova, country=Romania), grades=[Grade(discipline=a, date=2021-12-12, grade=10), Grade(discipline=b, date=2021-12-12, grade=9)]), headers={....

Although I have a message handler I suspect the reason for this problem is that i do not override the handle method. But i am unsure of how to do it.

CodePudding user response:

You have several problem:

  1. @InboundChannelAdapter and IntegrationFlows.from(messageProducer(). This way you create two independent polling endpoints for the same source.
  2. @ServiceActivator - the endpoint to write has just read file from one of the sources.
  3. There is no connection between @InboundChannelAdapter, your @ServiceActivator expectations and that flow.
  4. You have .transform((String path) -> xmlParser.parsePath(path)) and then immediately after that handle("xmlParser", "parsePath") which looks, essentially the same, but does not make sense since you are going to call the same parsePath() twice, but for different payloads, where the second one is going to be as a result of the first parsePath() call.

Please, revise your logic carefully: right now some of your configuration is misleading and really error-prone. I believe that error you got is because your parsePath() expects a String, but not Student as we see in the payload for that handle().

  • Related