I have created a flow that reads two files as input and copies them to a 'work' directory and then transforms them into a JobRequest. The payload is a JobExecution which contains the two files as parameters. So, I launch the batch then I want to get the two files from the jobExecution and transform them into two GenericMessage with Split. Finally I run a subflow that will write these files to another 'archive' directory. The program generates an exception:
2022-04-25 14:56:39.974 ERROR 4556 --- [ scheduling-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageHandlingException: failed to write Message payload to file in the [bean 'startBatchFlow.subFlow#2.file:outbound-gateway#0' for component 'startBatchFlow.subFlow#2.org.springframework.integration.config.ConsumerEndpointFactoryBean#0'; defined in: 'class path resource [/config/FilesReaderIntegrationConfig.class]'; from source: 'bean method startBatchFlow']; nested exception is java.nio.file.FileSystemException: C:\dev\Fichiers\test\work\Fonds.xlsx -> C:\dev\Fichiers\test\archive\Fonds.xlsx: Le processus ne peut pas accéder au fichier car ce fichier est utilisé par un autre processus, failedMessage=GenericMessage [payload=C:\dev\Fichiers\test\work\Fonds.xlsx, headers={sequenceNumber=1, file_name=Fonds.xlsx, sequenceSize=0, correlationId=14ecf13d-c99f-eff1-ae60-31400d606c2d, file_originalFile=C:\dev\Fichiers\test\work\Fonds.xlsx, id=16c09968-1833-d430-1542-ea1959364565, file_relativePath=Fonds.xlsx, timestamp=1650889605032}]
at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
at org.springframework.integration.file.FileWritingMessageHandler.handleRequestMessage(FileWritingMessageHandler.java:543)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:136)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.router.AbstractMessageRouter.doSend(AbstractMessageRouter.java:213)
at org.springframework.integration.router.AbstractMessageRouter.handleMessageInternal(AbstractMessageRouter.java:195)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
here is my code:
@Bean
public IntegrationFlow startBatchFlow() throws InterruptedException {
return IntegrationFlows //
.from(fileReadingMessageSource(), c -> c.poller(Pollers.fixedDelay(5000)))//
.handle(fileWritingMessageHandler())
.aggregate(agg -> agg.correlationExpression("payload != null")
.releaseStrategy(new ExpressionEvaluatingReleaseStrategy("size == 2"))
)
.transform(fileMessageToJobRequest()) //
.handle(jobLaunchingMessageHandler()) //
.wireTap(sf -> {
sf.handle(jobExecution ->
System.out.println("job Execution payload" jobExecution.getPayload()));
})
.split(new MessageSplitter())
.wireTap(sf -> {
sf.handle(jobExecution ->
System.out.println("Message after split payload =" jobExecution.getPayload()));
})
.route(Message.class, message -> message.getPayload() != null,
mapping -> mapping.subFlowMapping(true, archiveFilesFlow())
)
.get();
}
public IntegrationFlow archiveFilesFlow() {
return f -> f
.handle(fileWritingInArchive())
.handle(message -> {
System.out.println("here in archive flow" message.getPayload());
});
}
public FileWritingMessageHandler fileWritingInArchive() {
FileWritingMessageHandler fileWritingMessageHandler = new FileWritingMessageHandler(new File(archivePath));
fileWritingMessageHandler.setDeleteSourceFiles(true);
fileWritingMessageHandler.setExpectReply(true);
return fileWritingMessageHandler;
}
and the MessageSplitter:
@Service
public class MessageSplitter extends AbstractMessageSplitter {
@Override
protected Object splitMessage(Message<?> message) {
List<File> messages = new ArrayList();
((JobExecution) message.getPayload()).getJobParameters().getParameters().forEach((key, jobParam) -> {
messages.add(new File((String) jobParam.getValue()));
});
Iterator<MessageBuilder<File>> builderIterator = new Iterator<MessageBuilder<File>>() {
private File next;
private int index = 0;
@Override
public boolean hasNext() {
if (this.next != null) { // handle multiple hasNext() calls.
return true;
}
while (index < messages.size()) {
this.next = messages.get(index);
index ;
if (index == messages.size()) {
return false;
}
return true;
}
return false;
}
@Override
public MessageBuilder<File> next() {
File message = this.next;
this.next = null;
return MessageBuilder
.withPayload(message).setHeader("file_name", message.getName())
.setHeader("file_originalFile",message)
.setHeader("file_relativePath",message.getName());
}
};
return builderIterator;
}
}
Thanks in advance.
CodePudding user response:
There is an exception in your logs:
Le processus ne peut pas accéder au fichier car ce fichier est utilisé par un autre processus
Which translates from French as:
The process cannot access the file because this file is in use by another process
So, your C:\dev\Fichiers\test\work\Fonds.xlsx
is opened somewhere, but not closed.
Perhaps your job is doing something with the file content but does not close it in the end. The Unix-type operation systems are forgiving, but Windows doesn't let to do something with the file until its resource is not closed in other process.