I'm trying to rename files in a remote sftp server at the end of a transaction and using Spring Boot Integration. In the official documentation they provide examples using TransactionSynchronizationFactory with SpEL expressions similar to:
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory(){
ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
SpelExpressionParser spelParser = new SpelExpressionParser();
processor.setAfterCommitExpression(spelParser.parseRaw(
"payload.renameTo(headers['file_remoteDirectory'] '/' headers['file_remoteFile'] ,headers['file_remoteDirectory'] '/' headers['file_remoteFile'] '.PASSED')"));
processor.setAfterRollbackExpression(spelParser.parseRaw(
"payload.renameTo(headers['file_remoteDirectory'] '/' headers['file_remoteFile'] ,headers['file_remoteDirectory'] '/' headers['file_remoteFile'] '.FAILED')"));
return new DefaultTransactionSynchronizationFactory(processor);
}
In my implementation, I don't use a File object, but an InputStream as payload in the sftpAdapter. The transaction is working, but the expression evaluation is failing in afterCommit and afterRollBack operation and the files are not being renamed. I'm having below error from that method org.springframework.expression.spel.standard.SpelExpression#getValue(org.springframework.expression.EvaluationContext, java.lang.Object) :
EL1004E: Method call: Method rename(java.lang.String,java.lang.String) cannot be found on type com.jcraft.jsch.ChannelSftp$2
Here is my implementation:
@Bean
@InboundChannelAdapter(channel = sftp-inChannel",
poller = @Poller(value = "pollerMetadata"),
autoStartup = "${sftp.autoStartup:true}")
public MessageSource<InputStream> ftpMessageSource() {
SftpStreamingMessageSource source = new SftpStreamingMessageSource(sftpRemoteFileTemplate());
source.setRemoteDirectory(path);
source.setFilter(chainFilter());
source.setMaxFetchSize(maxFetchSize);
return source;
}
@Bean
public PollerMetadata pollerMetadata() {
return Pollers.fixedRate(delayInMillisec)
.maxMessagesPerPoll(maxFetchSize)
.advice(transactionInterceptor())
.transactionSynchronizationFactory(transactionSynchronizationFactory())
.transactional()
.get();
}
@Bean
public TransactionSynchronizationFactory transactionSynchronizationFactory(){
ExpressionEvaluatingTransactionSynchronizationProcessor processor = new ExpressionEvaluatingTransactionSynchronizationProcessor();
SpelExpressionParser spelParser = new SpelExpressionParser();
processor.setAfterCommitExpression(spelParser.parseRaw(
"payload.rename(headers['file_remoteDirectory'] '/' headers['file_remoteFile'] ,headers['file_remoteDirectory'] '/' headers['file_remoteFile'] '.PASSED')"));
processor.setAfterRollbackExpression(spelParser.parseRaw(
"payload.rename(headers['file_remoteDirectory'] '/' headers['file_remoteFile'] ,headers['file_remoteDirectory'] '/' headers['file_remoteFile'] '.FAILED')"));
return new DefaultTransactionSynchronizationFactory(processor);
}
I can see the class ChannelSftp has a .rename(String, String) method and is the payload's type, but it seems like SpEL is not able to see the casting from GenericMessage to ChannelSftp, hence can't see the .rename(String, String) method. I spent a day trying to find something in the documentation w/out success, any help would be greatly appreciated.
Thanks
CodePudding user response:
Your problem is here: com.jcraft.jsch.ChannelSftp$2
. Pay attention to that $2
. This is already not a ChannelSftp
, but an internal InputStream
for the remote file. And that's exactly what SftpStreamingMessageSource
is producing. It does not return files, neither ChannelSftp
. You cannot call rename()
on the InputStream
.
Consider to use a special IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE
header instead. This one is an instance of the org.springframework.integration.file.remote.session.Session
which already has a required rename(String pathFrom, String pathTo)
method. But again: this one is going to do that for the remote file:
processor.setAfterCommitExpression(spelParser.parseRaw(
"headers.closeableResource.rename(headers['file_remoteDirectory'] '/' headers['file_remoteFile'] ,headers['file_remoteDirectory'] '/' headers['file_remoteFile'] '.PASSED')"));