Home > Back-end >  camel pollEnrich is not working for the second time
camel pollEnrich is not working for the second time

Time:08-12

I am reading and processing 2 files from 2 different file locations and comparing the content. If 2nd file is not available , the rest of the process execute with 1st file. If 2nd file is available, comparison process should happen. For this I am using camel pollEnrich, but here the problem is that, camel is picking the 2nd file at first time only. Without restarting the camel route 2nd file is not getting picked up even if it is present there. After restarting the camel route it is working fine, but after that its not picking the 2nd file.

I am moving the files to different locations after processing it.

Below is my piece of code,

from("sftp:"   firstFileLocation   "?privateKeyFile="   ppkFileLocation   "&username="   sftpUsername
              "&readLock=changed&idempotent=true&move="   firstFileArchiveLocation)
    .pollEnrich("sftp:"   secondFileLocation   "?privateKeyFile="   ppkFileLocation   "&username="   sftpUsername
              "&readLock=changed&idempotent=true&fileExist=Ignore&move="  secondFileLocationArchive ,10000,new FileAggregationStrategy())
    .routeId("READ_INPUT_FILE_ROUTE")

Need help.

CodePudding user response:

You're setting idempotent=true in the sftp consumer, which means camel will not process the same file name twice. Since you're moving the files, it would make sense to set idempotent=false.

Quoted from camel documentation

Option to use the Idempotent Consumer EIP pattern to let Camel skip already processed files. Will by default use a memory based LRUCache that holds 1000 entries. If noop=true then idempotent will be enabled as well to avoid consuming the same files over and over again.

CodePudding user response:

I'm adding an alternative solution based on comments for the answer posted by Jeremy Ross. My answer is based on the following code example. I've only added the configure() method in the test route for brevity.

    @Override
    public void configure() throws Exception {

        String firstFileLocation = "//127.0.0.1/Folder1";
        String secondFileLocation = "//127.0.0.1/Folder2";
        String ppkFileLocation = "./key.pem";
        String sftpUsername = "user";
        String sftpPassword = "xxxxxx";
        String firstFileArchiveLocation = "./Archive1";
        String secondFileLocationArchive = "./Archive2";

        IdempotentRepository repository1 = MemoryIdempotentRepository.memoryIdempotentRepository(1000);
        IdempotentRepository repository2 = MemoryIdempotentRepository.memoryIdempotentRepository(1000);

        getCamelContext().getRegistry().bind("REPO1", repository1);
        getCamelContext().getRegistry().bind("REPO2", repository2);

        from("sftp:"   firstFileLocation
                  "?password="   sftpPassword   "&username="   sftpUsername
                  "&readLock=idempotent&idempotent=true&idempotentKey=\\${file:name}-\\${file:size}-\\${file:modified}"  
                "&idempotentRepository=#REPO1&stepwise=true&download=true&delay=10&move="   firstFileArchiveLocation)
                .to("direct:combined");

        from("sftp:"   secondFileLocation
                  "?password="   sftpPassword   "&username="   sftpUsername
                  "&readLock=idempotent&idempotent=true&idempotentKey=\\${file:name}-\\${file:size}-\\${file:modified}"  
                "&idempotentRepository=#REPO2"  
                "&stepwise=true&delay=10&move="   secondFileLocationArchive)
                .to("direct:combined");

        from("direct:combined")
                .aggregate(constant(true), (oldExchange, newExchange) -> {
                    if (oldExchange == null) {
                        oldExchange = newExchange;
                    }
                    String fileName = (String) newExchange.getIn().getHeaders().get("CamelFileName");
                    String filePath = (String) newExchange.getIn().getHeaders().get("CamelFileAbsolutePath");

                    if (filePath.contains("Folder1")) {
                        oldExchange.getIn().setHeader("File1", fileName);
                    } else {
                        oldExchange.getIn().setHeader("File2", fileName);

                    }

                    String file1Name = oldExchange.getIn().getHeader("File1", String.class);
                    String file2Name = oldExchange.getIn().getHeader("File2", String.class);

                    if (file1Name != null && file2Name != null) {

                        // Compare files
                        // Both files are available

                        oldExchange.getIn().setHeader("PROCEED", true);

                    } else if (file1Name != null) {
                        // No comparison, proceed with File 1
                        oldExchange.getIn().setHeader("PROCEED", true);
                    } else {

                        // Do not proceed, keep file 2 data and wait for File 1
                        oldExchange.getIn().setHeader("PROCEED", false);
                    }


                    String fileName1 = oldExchange.getIn().getHeader("File1", String.class);
                    String fileName2 = oldExchange.getIn().getHeader("File2", String.class);

                    oldExchange.getIn().setBody("File1: "   fileName1   " File2: "   fileName2);

                    System.out.println(oldExchange);
                    return oldExchange;
                }).completion(exchange -> {
                    if(exchange.getIn().getHeader("PROCEED", Boolean.class)) {
                        exchange.getIn().removeHeader("File1");
                        exchange.getIn().removeHeader("File2");
                        return true;
                    }
                    return false;
                }).to("log:Test");


    }

In this solution, two SFTP consumers were used, instead of pollEnrich, since we need to capture the file changes of both SFTP locations. I have used an idempotent repository and an idempotent key for ignoring duplicates. Further, I've used the same idempotent repository as the lock store assuming only camel routes are accessing the files.

After receiving the files from SFTP consumers, they are sent to the direct:combined producer, which then routes the exchange to an aggregator.

In the example aggregator strategy I have provided, you can see, that the file names are being stored in the exchange headers. According to the file information retrieved from the headers, the aggregator can decide how to process the file and whether or not to proceed with the exchange. (If only file2 is received, the exchange should not proceed to the next stages/routes)

Finally, the completion predicate expression decides whether or not to proceed with the exchange and log the exchange body, based on the headers set by the aggregator. I have added an example clean-up process in the predicate expression processor as well.

Hope you will get the basic idea of my suggestion to use an aggregator from this example.

  • Related