I am working on reactive streams using Spring webflux. I want to extract a variable(name
) from the middle of the reactive pipeline and use it in a different place as follows.
public class Example {
public Mono<String> test() {
String name;
return Mono.just("some random string")
.map(s -> {
name = s.toUpperCase();
return name;
}).map(...)
.map(...)
.flatMap(...)
.map(...)
.map(result -> result name)
.doOnSuccess(res -> asyncPublish(name));
public void asyncPublish(String name) {
// logic to write to a Messaging queue asynchronously
}
}
}
The above code is not working. This is a contrived example but shows what I want to achieve.
Note: I don't want to use multiple zip
operators just to bring the name
all the way to the last map where I want to use it. Is there a way I can store it in a variable as shown above and then use it somewhere else whereever I need it.
CodePudding user response:
You might use for example a Tuple2 to pass the value of name
along with the modified data through the chain.
return Mono.just("some random string")
.map(s -> s.toUpperCase())
.map(s -> Tuples.of(s, x(s))) // given that x(s) is the transformation of this map statement
.map(...) // keeping Tuple with the value of `name` in the first slot...
.flatMap(...) // keeping Tuple with the value of `name` in the first slot...
.map(resultTuple -> Tuples.of(resultTuple.getT1(), resultTuple.getT2() resultTuple.getT1()) // keeping Tuple with the value of `name` in the first slot...
.doOnSuccess(resultTuple -> asyncPublish(resultTuple.getT1()))
.map(resultTuple -> resultTuple.getT2()); // in case that the returned Mono should contain the modified value...
Tuples
is from the package reactor.util.function
and part of reactor-core.
Another way (without passing the value through the chain using Tuples) could be to use AtomicReference
(but I still think that the Tuple way is cleaner). The AtomicReference way might look like this:
public Mono<String> test() {
final AtomicReference<String> nameRef = new AtomicReference<>();
return Mono.just("some random string")
.map(s -> {
final String name = s.toUpperCase();
nameRef.set(name);
return name;
}).map(...)
.map(...)
.flatMap(...)
.map(...)
.map(result -> result nameRef.get())
.doOnSuccess(res -> asyncPublish(nameRef.get()));
public void asyncPublish(String name) {
// logic to write to a Messaging queue asynchronously
}
}