Home > Software engineering >  Extract variable from spring webflux reactive pipeline
Extract variable from spring webflux reactive pipeline

Time:09-21

I am working on reactive streams using Spring webflux. I want to extract a variable(name) from the middle of the reactive pipeline and use it in a different place as follows.

public class Example {

    public Mono<String> test() {
        String name;

        return Mono.just("some random string")
                    .map(s -> {
                        name = s.toUpperCase(); 
                        return name;
                    }).map(...)
                      .map(...)
                      .flatMap(...)
                      .map(...)
                      .map(result -> result name)
                      .doOnSuccess(res -> asyncPublish(name));
     
     public void asyncPublish(String name) {
        // logic to write to a Messaging queue asynchronously
     }
    }
}

The above code is not working. This is a contrived example but shows what I want to achieve.

Note: I don't want to use multiple zip operators just to bring the name all the way to the last map where I want to use it. Is there a way I can store it in a variable as shown above and then use it somewhere else whereever I need it.

CodePudding user response:

You might use for example a Tuple2 to pass the value of name along with the modified data through the chain.

return Mono.just("some random string")
            .map(s -> s.toUpperCase())
            .map(s -> Tuples.of(s, x(s))) // given that x(s) is the transformation of this map statement
            .map(...) // keeping Tuple with the value of `name` in the first slot...
            .flatMap(...) // keeping Tuple with the value of `name` in the first slot...
            .map(resultTuple -> Tuples.of(resultTuple.getT1(), resultTuple.getT2()   resultTuple.getT1()) // keeping Tuple with the value of `name` in the first slot...
            .doOnSuccess(resultTuple -> asyncPublish(resultTuple.getT1()))
            .map(resultTuple -> resultTuple.getT2()); // in case that the returned Mono should contain the modified value...

Tuples is from the package reactor.util.function and part of reactor-core.

Another way (without passing the value through the chain using Tuples) could be to use AtomicReference (but I still think that the Tuple way is cleaner). The AtomicReference way might look like this:

public Mono<String> test() {
    final AtomicReference<String> nameRef = new AtomicReference<>();

    return Mono.just("some random string")
                .map(s -> {
                    final String name = s.toUpperCase(); 
                    nameRef.set(name);
                    return name;
                }).map(...)
                  .map(...)
                  .flatMap(...)
                  .map(...)
                  .map(result -> result nameRef.get())
                  .doOnSuccess(res -> asyncPublish(nameRef.get()));
 
 public void asyncPublish(String name) {
    // logic to write to a Messaging queue asynchronously
 }
}
  • Related