I have this flow that I am trying to test but nothing works as expected. The flow itself works well but testing seems a bit tricky. This is my flow:
@Configuration
@RequiredArgsConstructor
public class FileInboundFlow {
private final ThreadPoolTaskExecutor threadPoolTaskExecutor;
private String filePath;
@Bean
public IntegrationFlow fileReaderFlow() {
return IntegrationFlows.from(Files.inboundAdapter(new File(this.filePath))
.filterFunction(...)
.preventDuplicates(false),
endpointConfigurer -> endpointConfigurer.poller(
Pollers.fixedDelay(500)
.taskExecutor(this.threadPoolTaskExecutor)
.maxMessagesPerPoll(15)))
.transform(new UnZipTransformer())
.enrichHeaders(this::headersEnricher)
.transform(Message.class, this::modifyMessagePayload)
.route(Map.class, this::channelsRouter)
.get();
}
private String channelsRouter(Map<String, File> payload) {
boolean isZip = payload.values()
.stream()
.anyMatch(file -> isZipFile(file));
return isZip ? ZIP_CHANNEL : XML_CHANNEL; // ZIP_CHANNEL and XML_CHANNEL are PublishSubscribeChannel
}
@Bean
public SubscribableChannel xmlChannel() {
var channel = new PublishSubscribeChannel(this.threadPoolTaskExecutor);
channel.setBeanName(XML_CHANNEL);
return channel;
}
@Bean
public SubscribableChannel zipChannel() {
var channel = new PublishSubscribeChannel(this.threadPoolTaskExecutor);
channel.setBeanName(ZIP_CHANNEL);
return channel;
}
//There is a @ServiceActivator on each channel
@ServiceActivator(inputChannel = XML_CHANNEL)
public void handleXml(Message<Map<String, File>> message) {
...
}
@ServiceActivator(inputChannel = ZIP_CHANNEL)
public void handleZip(Message<Map<String, File>> message) {
...
}
//Plus an @Transformer on the XML_CHANNEL
@Transformer(inputChannel = XML_CHANNEL, outputChannel = BUS_CHANNEL)
private List<BusData> xmlFileToIngestionMessagePayload(Map<String, File> xmlFilesByName) {
return xmlFilesByName.values()
.stream()
.map(...)
.collect(Collectors.toList());
}
}
I would like to test multiple cases, the first one is checking the message payload published on each channel after the end of fileReaderFlow
.
So I defined this test classe:
@SpringBootTest
@SpringIntegrationTest
@ExtendWith(SpringExtension.class)
class FileInboundFlowTest {
@Autowired
private MockIntegrationContext mockIntegrationContext;
@TempDir
static Path localWorkDir;
@BeforeEach
void setUp() {
copyFileToTheFlowDir(); // here I copy a file to trigger the flow
}
@Test
void checkXmlChannelPayloadTest() throws InterruptedException {
Thread.sleep(1000); //waiting for the flow execution
PublishSubscribeChannel xmlChannel = this.getBean(XML_CHANNEL, PublishSubscribeChannel.class); // I extract the channel to listen to the message sent to it.
xmlChannel.subscribe(message -> {
assertThat(message.getPayload()).isInstanceOf(Map.class); // This is never executed
});
}
}
As expected that test does not work because the assertThat(message.getPayload()).isInstanceOf(Map.class);
is never executed.
After reading the documentation I didn't find any hint to help me solved that issue. Any help would be appreciated! Thanks a lot
CodePudding user response:
First of all that channel.setBeanName(XML_CHANNEL);
does not effect the target bean. You do this on the bean creation phase and dependency injection container knows nothing about this setting: it just does not consult with it. If you really would like to dictate an XML_CHANNEL
for bean name, you'd better look into the @Bean(name)
attribute.
The problem in the test that you are missing the fact of async logic of the flow. That Files.inboundAdapter()
works if fully different thread and emits messages outside of your test method. So, even if you could subscribe to the channel in time, before any message is emitted to it, that doesn't mean your test will work correctly: the assertThat()
will be performed on a different thread. Therefore no real JUnit report for your test method context.
So, what I'd suggest to do is:
- Have
Files.inboundAdapter()
stopped in the beginning of the test before any setup you'd like to do in the test. Or at least don't place files into thatfilePath
, so the channel adapter doesn't emit messages. - Take the channel from the application context and if you wish subscribe or use a
ChannelInterceptor
. - Have an async barrier, e.g.
CountDownLatch
to pass to that subscriber. - Start the channel adapter or put file into the dir for scanning.
- Wait for the async barrier before verifying some value or state.