Home > Net >  How to correctly fan out a list in CompletableFutures
How to correctly fan out a list in CompletableFutures

Time:07-12

I'm trying to fan out from a method which returns a CompletableFuture<List> into another method for each list element, where the fan in turn also returns a CompletableFuture each. Afterwards i want to return a CompletableFuture.allOf from the Futures produced by the list.

In essence i have the following methods (assume they are in their own Service classes, i just combined them for brevity here):

@Async
public CompletableFuture<List<File>> getMatchingCsvFrom(Path directory, Pattern pattern) {
    ...some work here
}

@Async
public CompletableFuture<Boolean> processSingleCsv(File csvFile) {
    ...some work here
}

And i'm trying to call them like this:

public CompletableFuture<Void> processDirectory(Path directory) {
    CompletableFuture<List<File>> matchingCsvFrom = fileProcessingService.getMatchingCsvFrom(directory, PROCESS_PATTERN);

    List<CompletableFuture<Boolean>> processFutures = matchingCsvFrom.get().stream()
            .map(file -> processService.processProcessCsv(file))
            .collect(Collectors.toList());
    return CompletableFuture.allOf(processFutures.toArray(new CompletableFuture[0]));
}

The .get() is obviously a problem there, but i'm not able to resolve it using .thenApply(), .thenAccept(), or .thenCompose().

Unfortunately all other answers i found want to accomplish the opposite (going from List<CompletableFuture> to CompletableFuture<List>). Appreciate any suggestions!

CodePudding user response:

public CompletableFuture<Void> processDirectory(Path directory) {
    CompletableFuture<List<File>> matchingCsvFrom = fileProcessingService.getMatchingCsvFrom(directory, PROCESS_PATTERN);

    return matchingCsvFrom.thenCompose( list -> {
        var processFutures = list.stream()
            .map(file -> processService.processProcessCsv(file))
            .collect(Collectors.toList())
            .toArray(new CompletableFuture[0]);
        return CompletableFuture.allOf(processFutures);
    });
}
  • Related