Home > Software engineering >  Converting nested for loops to reactive Flux
Converting nested for loops to reactive Flux

Time:10-24

I have a code logic that needs to be in a series of nested loops. Here is the sample:

String q = null;
for(C c in getC()){
    if(Valid(c)){
        String Id = c.Id;
        List<M> mList = getM(Id);
        for(M m in mList){
            q = Message.Format("query:{}",m);
            List<D> dList = getD(Id, q);
            sendDToKafka(dList);
        }
    }
}

I am trying to convert the above logic to be reactive using project reactor. The code I have so far:

Flux.fromIterable(getC())
    .map(c -> c.getId)
    .doOnNext(cId -> getM(cId))
    .map(m -> m.trim())
    .doOnNext(m -> getD()) // need to pass in query and Id to getD()
    .subscribe();

There are few issues I am facing:

  1. How can I incorporate the IsValid() method into the query.
  2. I need to reuse the cId value I get on the first map - .map(c -> c.getId), in two places. How can I keep track of that value if not used immediately in the next step.
  3. Is there a way to form the q variable within the reactive query to pass as argument in getD()
  4. I would really appreciate any feedback if the code is an efficient approach.

CodePudding user response:

First of all, doOnNext methods are for side-effects and not for changing the flow of events. Also, if you are converting something to reactive, the whole pipeline needs to be non-blocking and you should avoid calling any code that is blocking. In case if you have blocking code that can't be changed, you can follow the advice here:

For filtering you can use filter and for using cId at multiple places either pass it as tuple down the chain (lot of libraries out there) or you can create your own class for that purpose.

CodePudding user response:

  1. How can I incorporate the IsValid() method into the query.

There's an operator for that:

Flux.fromIterable(getC())
            .filter(c -> valid(c))
  1. I need to reuse the cId value I get on the first map - .map(c -> c.getId), in two places. How can I keep track of that value if not used immediately in the next step.

In the particular simple case you can simply use nested flatmaps like this:

.flatMap(id ->
        Flux.fromIterable(getM(id))
            .flatMap(m -> {
                String q = Message.Format("query: {}", m);
                List<D> dList = getD(id, q);
                return sendDToKafka(dList);
            })
    )

See What is a good idiom for nested flatMaps in Java Reactor?

  1. Is there a way to form the q variable within the reactive query to pass as argument in getD()

You can't use the non-final q variable within the stream. Have a look at AtomicReference<String>.

As already mentioned in the other answer, doOnNext is for side effects, map is for mapping something from one type to another type.

  • Related