Home > Back-end >  Flux last() operation when empty
Flux last() operation when empty

Time:01-28

I am trying solve my problem when i need to get last element (last method) of a flux but in some cases these flux can be empty and the follow error is appear

Flux#last() didn't observe any onNext signal

and this is the chain i have

return apiService.getAll(entry)
      .flatMap(response -> {
          if (response.getId() != null){
             //do some logic
             return Mono.just("some Mono");
          }
          else{
             return Mono.empty();
             }
          })
          .last()
          //more flatMap operators

I already use switchIfEmpty()as well but can't fix. What is the correct implementation to verify if can call last() or skip and return a empty to terminate chain operation.

Thanks,

CodePudding user response:

According to Flux.last() api doc:

emit NoSuchElementException error if the source was empty. For a passive version use takeLast(int)

It means that, for an empty upstream Flux:

  • last() will emit an error
  • takeLast(1) will return an empty flux

Now, takeLast(1) returns a Flux, not a Mono, as last() does. Then, you can just chain it with Flux.next(), and it will return the only retained value (if any), or propagate the empty signal.

Note: another solution would be to use last().onErrorResume(NoSuchElementException.class, err -> Mono.empty()). This would catch the error sent by last() internally, and then return an empty mono. However, if you've got some code other than last() that can throw a NoSuchElementException, you might miss a problem. For this, my personal choice for your case would be to use takeLast(1).next().

The following code example shows behavior of last() vs takeLast(1).next():

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

public class FluxLast {

    static void subscribe(Mono<?> publisher) {
        publisher.subscribe(value -> {},
                            err -> System.out.println("Failed: "   err.getMessage()),
                            () -> System.out.println("Completed empty"));
    }
    
    public static void main(String[] args) {
        subscribe(Flux.empty().last());
        subscribe(Flux.empty().takeLast(1).next());

        // When not empty, takeLast(1).next() will return the last value
        Integer v = Flux.just(1, 2, 3)
                            .takeLast(1)
                            .next()
                            .block();
        System.out.println("Last value: " v);
    }
}

Program output:

Failed: Flux#last() didn't observe any onNext signal from Callable flux
Completed empty
3
  • Related