I'm using WebFlux
to pull data from two different REST endpoints, and trying to correlate some data from one stream with the other. I have Flux
instances called events
and egvs
and for each event, I want to find the EGV with the nearest timestamp.
final Flux<Tuple2<Double,Object>> data = events
.map(e -> Tuples.of(e.getValue(),
egvs.map(egv -> Tuples.of(egv.getValue(),
Math.abs(Duration.between(e.getDisplayTime(),
egv.getDisplayTime()).toSeconds())))
.sort(Comparator.comparingLong(Tuple2::getT2))
.take(1)
.map(v -> v.getT1())));
When I send data
to my Thymeleaf template, the first element of the tuple renders as a number, as I'd expect, but the second element renders as a FluxMapFuseable
. It appears that the egvs.map(...)
portion of the pipeline isn't executing. How do I get that part of the pipeline to execute?
UPDATE
Thanks, @Toerktumlare - your answer helped me figure out that my approach was wrong. On each iteration through the map operation, the event needs the context of the entire set of EGVs to find the one it matches with. So the working code looks like this:
final Flux<Tuple2<Double, Double>> data =
Flux.zip(events, egvs.collectList().repeat())
.map(t -> Tuples.of(
// Grab the event
t.getT1().getValue(),
// Find the EGV (from the full set of EGVs) with the closest timestamp
t.getT2().stream()
.map(egv -> Tuples.of(
egv.getValue(),
Math.abs(Duration.between(
t.getT1().getDisplayTime(),
egv.getDisplayTime()).toSeconds())))
// Sort the stream of (value, time difference) tuples and
// take the smallest time difference.
.sorted(Comparator.comparingLong(Tuple2::getT2))
.map(Tuple2::getT1)
.findFirst()
.orElse(0.)));
CodePudding user response:
what i think you are doing is that you are breaking the reactive chain.
During the assembly phase
reactor will call each operator backwards until it finds a producer
that can start producing items and i think you are breaking that chain here:
egvs.map(egv -> Tuples.of( ..., ... )
you see egvs
returns something that you need to take care of and chain on to the return of events.map
I'll give you an example:
// This works because we always return from flatMap
// we keep the chain intact
Mono.just("foobar").flatMap(f -> {
return Mono.just(f)
}.subscribe(s -> {
System.out.println(s)
});
on the other hand, this behaves differently:
Mono.just("foobar").flatMap(f -> {
Mono.just("foo").doOnSuccess(s -> { System.out.println("this will never print"); });
return Mono.just(f);
});
Because in this example you can see that we ignore to take care of the return from the inner Mono
thus breaking the chain.
You havn't really disclosed what evg
actually is so i wont be able to give you a full answer but you should most likely do something like this:
final Flux<Tuple2<Double,Object>> data = events
// chain on egv here instead
// and then return your full tuple object instead
.map(e -> egvs.map(egv -> Tuples.of(e.getValue(), Tuples.of(egv.getValue(), Math.abs(Duration.between(e.getDisplayTime(), egv.getDisplayTime()).toSeconds())))
.sort(Comparator.comparingLong(Tuple2::getT2))
.take(1)
.map(v -> v.getT1())));
I don't have compiler to check against atm. but i believe that is your problem at least. its a bit tricky to read your code.