Home > Software design >  Immediately return first emitted value from two Monos while continuing to process the other asynchro
Immediately return first emitted value from two Monos while continuing to process the other asynchro

Time:02-01

I have two data sources, each returning a Mono:

class CacheCustomerClient {
    Mono<Entity> createCustomer(Customer customer)
}

class MasterCustomerClient {
    Mono<Entity> createCustomer(Customer customer)
}

Callers to my application are hitting a Spring WebFlux controller:

@PostMapping
@ResponseStatus(HttpStatus.CREATED)
public Flux<Entity> createCustomer(@RequestBody Customer customer) {
    return customerService.createNewCustomer(entity);
}

As long as either data source successfully completes its create operation, I want to immediately return a success response to the caller, however, I still want my service to continue processing the result of the other Mono stream, in the event that an error was encountered, so it can be logged.

The problem seems to be that as soon as a value is returned to the controller, a cancel signal is propagated back through the stream by Spring WebFlux and, thus, no information is logged about a failure.

Here's one attempt:

public Flux<Entity> createCustomer(final Customer customer) {
        var cacheCreate = cacheClient
                .createCustomer(customer)
                .doOnError(WebClientResponseException.class,
                    err -> log.error("Customer creation failed in cache"));
        var masterCreate = masterClient
                .createCustomer(customer)
                .doOnError(WebClientResponseException.class,
                    err -> log.error("Customer creation failed in master"));

        return Flux.firstWithValue(cacheCreate, masterCreate)
                .onErrorMap((err) -> new Exception("Customer creation failed in cache and master"));
    }

Flux.firstWithValue() is great for emitting the first non-error value, but then whichever source is lagging behind is cancelled, meaning that any error is never logged out. I've also tried scheduling these two sources on their own Schedulers and that didn't seem to help either.

How can I perform these two calls asynchronously, and emit the first value to the caller, while continuing to listen for emissions on the slower source?

CodePudding user response:

You can achieve that by transforming you operators to "hot" publishers using share() operator:

  1. First subscriber launch the upstream operator, and additional subscribers get back result cached from the first subscriber:

    Further Subscriber will share [...] the same result.

  2. Once a second subscription has been done, the publisher is not cancellable:

    It's worth noting this is an un-cancellable Subscription.

So, to achieve your requirement:

  1. Apply share() on each of your operators
  2. Launch a subscription on shared publishers to trigger processing
  3. Use shared operators in your pipeline (here firstWithValue).

Sample example:

import java.time.Duration;
import reactor.core.publisher.Mono;

public class TestUncancellableMono {

    // Mock a mono successing quickly
    static Mono<String> quickSuccess() {
        return Mono.delay(Duration.ofMillis(200)).thenReturn("SUCCESS !");
    }

    // Mock a mono taking more time and ending in error.
    static Mono<String> longError() {
        return Mono.delay(Duration.ofSeconds(1))
                   .<String>then(Mono.error(new Exception("ERROR !")))
                    .doOnCancel(() -> System.out.println("CANCELLED"))
                    .doOnError(err -> System.out.println(err.getMessage()));
    }

    public static void main(String[] args) throws Exception {
        // Transform to hot publisher
        var sharedQuick = quickSuccess().share();
        var sharedLong  = longError().share();

        // Trigger launch
        sharedQuick.subscribe();
        sharedLong.subscribe();

        // Subscribe back to get the cached result
        Mono
                .firstWithValue(sharedQuick, sharedLong)
                .subscribe(System.out::println, err -> System.out.println(err.getMessage()));

        // Wait for subscription to end.
        Thread.sleep(2000);
    }
}

The output of the sample is:

SUCCESS !
ERROR !

We can see that error message has been propagated properly, and that upstream publisher has not been cancelled.

  • Related