This question is exactly the same as this one, which wasn't actually answered (that code only uses one thread). My code looks like this at the moment
CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
foo.stream().parallel()
.forEach(bar -> {
//business logic
}
);
return null;
}, new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooBarStream"));
completableFuture.get();
yet only one thread is correctly traced. Using .parallelStream()
or a LazyTraceExecutor
directly instead of a TraceableExecutorService
didn't help.
CodePudding user response:
Seems to work thanks to this example. The snippet above becomes:
TraceableExecutorService executorService = new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooStream");
CompletableFuture.allOf(runnablesBusinessLogic(foo,executorService)).get();
where runnablesBusinessLogic
is
private CompletableFuture<Void>[] runnablesBusinessLogic(List<FooBar> foo, ExecutorService executorService) {
List<CompletableFuture<?>> futures = new ArrayList<>();
for (FooBar f : foo) {
futures.add(CompletableFuture.runAsync(() -> {
businessLogic(f);
return;
}, executorService));
}
return futures.toArray(new CompletableFuture[futures.size()]);
}
If I understood the example (and the discussion behind the current status of the documentation) correctly, Sleuth cannot work with a ForkJoinPool
(and so with parallel streams) automatically. The main idea to make it work is not to create a CompletableFuture
and split it, but to create several futures (and join them).