Home > OS >  Propagate Sleuth baggage on parallel streams
Propagate Sleuth baggage on parallel streams

Time:03-31

This question is exactly the same as this one, which wasn't actually answered (that code only uses one thread). My code looks like this at the moment

CompletableFuture completableFuture = CompletableFuture.supplyAsync(() -> {
            foo.stream().parallel()
                    .forEach(bar -> {
                                //business logic
                            }
                    );
            return null;
        }, new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooBarStream"));

completableFuture.get();

yet only one thread is correctly traced. Using .parallelStream() or a LazyTraceExecutor directly instead of a TraceableExecutorService didn't help.

CodePudding user response:

Seems to work thanks to this example. The snippet above becomes:

TraceableExecutorService executorService = new TraceableExecutorService(this.beanFactory, Executors.newFixedThreadPool(threads), "fooStream");
CompletableFuture.allOf(runnablesBusinessLogic(foo,executorService)).get();

where runnablesBusinessLogic is

private CompletableFuture<Void>[] runnablesBusinessLogic(List<FooBar> foo, ExecutorService executorService) {
    List<CompletableFuture<?>> futures = new ArrayList<>();
    for (FooBar f : foo) {
        futures.add(CompletableFuture.runAsync(() -> {
            businessLogic(f);
            return;
        }, executorService));
    }
    return futures.toArray(new CompletableFuture[futures.size()]);
}

If I understood the example (and the discussion behind the current status of the documentation) correctly, Sleuth cannot work with a ForkJoinPool (and so with parallel streams) automatically. The main idea to make it work is not to create a CompletableFuture and split it, but to create several futures (and join them).

  • Related