Home > Enterprise >  How to reduce a Flux to Mono and stop consuming the stream by a certain condition
How to reduce a Flux to Mono and stop consuming the stream by a certain condition

Time:07-26

I'm trying to combine 2 sinks in one service and want to achieve the buffering of requests.

I add a request for data to the reqSink, combine requests with the "bufferTimeout" method and fetch response for the combined requests from some remote client.

When the response is fetched I publish the result to resSink to be able to transfer it to the subscribers.

Subscribers should filter results to get only data related to their request and return. However, on this step, I have an issue. From the logs I see that subscribers receive correct data in the "reduce" method, but the result doesn't return because the resSink is not complete.

How can I return result when "reduce" method received all the necessary data without completing the resSink?

Code for service:

    @Slf4j
    public class TestSinkService {
    
        public static final int MAX_BUFFER_SIZE = 2;
        public static final Duration MAX_BUFFER_TIME = Duration.ofMillis(500);
    
        private Sinks.Many<List<String>> reqSink = Sinks.many().replay().all();
        private Sinks.Many<Map<String, Double>> resSink = Sinks.many().replay().all();
    
        public void startListen() {
            reqSink
                    .asFlux()
                    .bufferTimeout(MAX_BUFFER_SIZE, MAX_BUFFER_TIME)
                    .subscribe(lists -> {
                        List<String> collect = lists.stream()
                                .flatMap(List::stream)
                                .distinct()
                                .collect(Collectors.toList());
    
                        log.info("Fetching from client {}", collect);
    
                        getDataFromClientDumb(collect)
                                .subscribe(stringDoubleMap -> {
                                    log.info("Received from client {}", stringDoubleMap);
                                    resSink.emitNext(stringDoubleMap, Sinks.EmitFailureHandler.FAIL_FAST);
                                });
                    });
    
        }
    
        public Mono<Map<String, Double>> fetchData(List<String> list) {
            Mono<Map<String, Double>> mapMono = getRes(list);
            reqSink.emitNext(list, Sinks.EmitFailureHandler.FAIL_FAST);
            return mapMono;
        }
    
    
        private Mono<Map<String, Double>> getDataFromClientDumb(List<String> list) {
            Map<String, Double> data = list.stream().collect(Collectors.toMap(Function.identity(), v -> Math.random()));
            return Mono.just(data);
        }
    
    
        private Mono<Map<String, Double>> getRes(List<String> list) {
    
            Flux<Map<String, Double>> mapFlux = resSink.asFlux();
            return  mapFlux
                    .reduce(new HashMap<>(), (accu, next) -> {
                log.info("Next {}", next);
                for (String req: list) {
                    if (next.containsKey(req)) {
                        accu.put(req, next.get(req));
                    }
                }
                log.info("Accu {}", next);
                return accu;
            });
        }
    }

Test which hangs forever:

    @Slf4j
    class SinkServiceTest {
    
        @Test
        void fetchDataTest() {
    
            TestSinkService testSinkService = new TestSinkService();
            testSinkService.startListen();
    
            int numberOfRequests = 5;
    
            for (int i = 0; i < numberOfRequests; i  ) {
                List<String> request = generateRequest();
                Mono<Map<String, Double>> response = testSinkService.fetchData(request);
                StepVerifier.create(response).assertNext(stringDoubleMap -> {
                    log.info("Received in response {}", stringDoubleMap);
                    assertTrue(stringDoubleMap.keySet().containsAll(request));
                }).verifyComplete();
            }
        }
    
        private List<String> generateRequest() {
            List<String> countries = List.of("AL", "AD", "AM", "AT", "BY", "BE", "BA", "BG", "CH", "CY", "CZ", "DE",
                    "DK", "EE", "ES", "FO", "FI", "FR", "GB", "GE", "GI", "GR", "HU", "HR",
                    "IE", "IS", "IT", "LI", "LT", "LU", "LV", "MC", "MK", "MT", "NO", "NL", "PL",
                    "PT", "RO", "RS", "RU", "SE", "SI", "SK", "SM", "TR", "UA", "VA");
    
            countries = new ArrayList<>(countries);
            Collections.shuffle(countries);
    
            return countries.subList(0, ThreadLocalRandom.current().nextInt(1, 10));
        }
    }

CodePudding user response:

Found a solution myself: instead of using the "reduce" operator, I use the "scan" operator. It has the same signature but the difference is that the result is published so after the scan it is possible to terminate the stream and return the correct result.

private Mono<Map<String, Double>> getRes(List<String> list) {

    Flux<Map<String, Double>> mapFlux = resSink.asFlux();
    Map<String, Double> initial = new HashMap<>();
    return mapFlux
            .scan(initial, (accu, next) -> {
                for (String req : list) {
                    if (next.containsKey(req)) {
                        accu.put(req, next.get(req));
                    }
                }
                return accu;
            })
            .takeUntil(stringDoubleMap -> stringDoubleMap.keySet().containsAll(list))
            .last();
}
  • Related