Home > Enterprise >  How do I use multiple reactive streams in the same pipeline?
How do I use multiple reactive streams in the same pipeline?

Time:04-01

I'm using WebFlux to pull data from two different REST endpoints, and trying to correlate some data from one stream with the other. I have Flux instances called events and egvs and for each event, I want to find the EGV with the nearest timestamp.

        final Flux<Tuple2<Double,Object>> data = events
                .map(e -> Tuples.of(e.getValue(),
                        egvs.map(egv -> Tuples.of(egv.getValue(),
                                        Math.abs(Duration.between(e.getDisplayTime(),
                                                egv.getDisplayTime()).toSeconds())))
                                .sort(Comparator.comparingLong(Tuple2::getT2))
                                .take(1)
                                .map(v -> v.getT1())));

When I send data to my Thymeleaf template, the first element of the tuple renders as a number, as I'd expect, but the second element renders as a FluxMapFuseable. It appears that the egvs.map(...) portion of the pipeline isn't executing. How do I get that part of the pipeline to execute?

UPDATE

Thanks, @Toerktumlare - your answer helped me figure out that my approach was wrong. On each iteration through the map operation, the event needs the context of the entire set of EGVs to find the one it matches with. So the working code looks like this:

        final Flux<Tuple2<Double, Double>> data =
                Flux.zip(events, egvs.collectList().repeat())
                        .map(t -> Tuples.of(
                                // Grab the event
                                t.getT1().getValue(),
                                // Find the EGV (from the full set of EGVs) with the closest timestamp
                                t.getT2().stream()
                                        .map(egv -> Tuples.of(
                                                egv.getValue(),
                                                Math.abs(Duration.between(
                                                        t.getT1().getDisplayTime(),
                                                        egv.getDisplayTime()).toSeconds())))
                                        // Sort the stream of (value, time difference) tuples and
                                        // take the smallest time difference.
                                        .sorted(Comparator.comparingLong(Tuple2::getT2))
                                        .map(Tuple2::getT1)
                                        .findFirst()
                                        .orElse(0.)));

CodePudding user response:

what i think you are doing is that you are breaking the reactive chain.

During the assembly phase reactor will call each operator backwards until it finds a producer that can start producing items and i think you are breaking that chain here:

egvs.map(egv -> Tuples.of( ..., ... )

you see egvs returns something that you need to take care of and chain on to the return of events.map

I'll give you an example:

// This works because we always return from flatMap
// we keep the chain intact
Mono.just("foobar").flatMap(f -> {
    return Mono.just(f)
}.subscribe(s -> {
    System.out.println(s)
});

on the other hand, this behaves differently:

Mono.just("foobar").flatMap(f -> {
    Mono.just("foo").doOnSuccess(s -> { System.out.println("this will never print"); });
    return Mono.just(f);
});

Because in this example you can see that we ignore to take care of the return from the inner Mono thus breaking the chain.

You havn't really disclosed what evg actually is so i wont be able to give you a full answer but you should most likely do something like this:

final Flux<Tuple2<Double,Object>> data = events
            // chain on egv here instead
            // and then return your full tuple object instead
            .map(e -> egvs.map(egv -> Tuples.of(e.getValue(), Tuples.of(egv.getValue(), Math.abs(Duration.between(e.getDisplayTime(), egv.getDisplayTime()).toSeconds())))
            .sort(Comparator.comparingLong(Tuple2::getT2))
            .take(1)
            .map(v -> v.getT1())));

I don't have compiler to check against atm. but i believe that is your problem at least. its a bit tricky to read your code.

  • Related