I have two simple interfaces called Processor
and SeedPreProcessor
and they are defined like:
Processor:
public interface Processor<I, O> {
Mono<O> process(I input);
}
SeedPreProcessor:
public interface SeedPreProcessor<D> extends Processor<D, D> {
/**
* Specify the location of this processor in the pipeline.
*
* @return the order
*/
Integer order();
String name();
}
and a PipeLine
defined like:
public class PipeLine {
private final List<SeedPreProcessor<PreProcessorDocument>> allProcessors;
public PipeLine(List<SeedPreProcessor<PreProcessorDocument>> allProcessors) {
this.allProcessors = new ArrayList<>(allProcessors);
this.allProcessors.sort(comparingInt(SeedPreProcessor::order));
}
public Mono<PreProcessorDocument> execute(String url) {
log.info("Start processing URL = {}", url);
var initial = new PreProcessorDocument(url);
return Flux
.fromIterable(allProcessors)
.map(proc -> proc.process(initial).t) // my problem is here
}
}
I want to for a inital PreProcessorDocument
to execute all the SeedPreProcessor
in the
list allProcessors
one by one.
How can I achieve this?
CodePudding user response:
Simple foreach
loop and flatMap
operator will do the trick :
public Mono<PreProcessorDocument> execute(String url) {
var initial = new PreProcessorDocument(url);
Mono<PreProcessorDocument> m = Mono.just(initial);
for (SeedPreProcessor<PreProcessorDocument> p : allProcessors) {
m = m.flatMap(p::process);
}
return m;
}