Home > database >  How to implement pipeline design pattern using reactive webflux?
How to implement pipeline design pattern using reactive webflux?

Time:02-02

I have two simple interfaces called Processor and SeedPreProcessor and they are defined like:

Processor:

public interface Processor<I, O> {

    Mono<O> process(I input);
}

SeedPreProcessor:

public interface SeedPreProcessor<D> extends Processor<D, D> {

    /**
     * Specify the location of this processor in the pipeline.
     *
     * @return the order
     */
    Integer order();

    String name();
}

and a PipeLine defined like:

public class PipeLine {

    private final List<SeedPreProcessor<PreProcessorDocument>> allProcessors;

    public PipeLine(List<SeedPreProcessor<PreProcessorDocument>> allProcessors) {
        this.allProcessors = new ArrayList<>(allProcessors);
        this.allProcessors.sort(comparingInt(SeedPreProcessor::order));
    }

    public Mono<PreProcessorDocument> execute(String url) {
        log.info("Start processing URL = {}", url);
        var initial = new PreProcessorDocument(url);
        return Flux
                .fromIterable(allProcessors)
                .map(proc -> proc.process(initial).t) // my problem is here
    }
}

I want to for a inital PreProcessorDocument to execute all the SeedPreProcessor in the list allProcessors one by one.

How can I achieve this?

CodePudding user response:

Simple foreach loop and flatMap operator will do the trick :

public Mono<PreProcessorDocument> execute(String url) {
    var initial = new PreProcessorDocument(url);
    Mono<PreProcessorDocument> m = Mono.just(initial);

    for (SeedPreProcessor<PreProcessorDocument> p : allProcessors) {
        m = m.flatMap(p::process);
    }
    return m;
}
  • Related