I am trying solve my problem when i need to get last element (last method) of a flux but in some cases these flux can be empty and the follow error is appear
Flux#last() didn't observe any onNext signal
and this is the chain i have
return apiService.getAll(entry)
.flatMap(response -> {
if (response.getId() != null){
//do some logic
return Mono.just("some Mono");
}
else{
return Mono.empty();
}
})
.last()
//more flatMap operators
I already use switchIfEmpty()
as well but can't fix.
What is the correct implementation to verify if can call last() or skip and return a empty to terminate chain operation.
Thanks,
CodePudding user response:
According to Flux.last()
api doc:
emit NoSuchElementException error if the source was empty. For a passive version use takeLast(int)
It means that, for an empty upstream Flux:
last()
will emit an errortakeLast(1)
will return an empty flux
Now, takeLast(1)
returns a Flux, not a Mono, as last() does. Then, you can just chain it with Flux.next()
, and it will return the only retained value (if any), or propagate the empty signal.
Note: another solution would be to use last().onErrorResume(NoSuchElementException.class, err -> Mono.empty())
.
This would catch the error sent by last()
internally, and then return an empty mono.
However, if you've got some code other than last()
that can throw a NoSuchElementException
, you might miss a problem. For this, my personal choice for your case would be to use takeLast(1).next()
.
The following code example shows behavior of last() vs takeLast(1).next()
:
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
public class FluxLast {
static void subscribe(Mono<?> publisher) {
publisher.subscribe(value -> {},
err -> System.out.println("Failed: " err.getMessage()),
() -> System.out.println("Completed empty"));
}
public static void main(String[] args) {
subscribe(Flux.empty().last());
subscribe(Flux.empty().takeLast(1).next());
// When not empty, takeLast(1).next() will return the last value
Integer v = Flux.just(1, 2, 3)
.takeLast(1)
.next()
.block();
System.out.println("Last value: " v);
}
}
Program output:
Failed: Flux#last() didn't observe any onNext signal from Callable flux
Completed empty
3