Home > database >  Correct way to await in Spring Webflux
Correct way to await in Spring Webflux

Time:01-03

We have a HTTP endpoint that takes a command and send it via kafka to another place, then we MUST wait the response via kafka and then return ResponseEntity to client.

  public Mono<ResponseEntity<byte[]>> post(@RequestBody Mono<byte[]> b) {
    return b.flatMap(
            identifyType -> service.requestProcess(identifyType).flatMap(this::toResponse));
  }

In the service:

public Mono<String> requestProcess(byte[] b) {
    String r = authenticationService.waitForAuthentication(parseBytes(b));
    if (StringUtils.isEmpty(r)) {
      throw new CustomException("Failed to authenticate");
    }
    return Mono.just(r);
}

In the authenticationService

public String waitForAuthentication(String data) {
    CountDownLatch countDownLatch = new CountDownLatch(1);
    Listener listener = new Listener(countDownLatch);
    //another setup
    subscribe("topic_abc", String.class, listener);
    sendAuthenticationRequestToKafka(data);
    try {
      boolean b = countDownLatch.await(2, TimeUnit.SECONDS);
      if(b) return listerner.getData();
      else throw new CustomExcetion();
    catch(Exception e) {
      throw new CustomExcetion();
    }
}

The listener

@RequiredArgsConstructor
public class Listener implements Consumer<String> {
  @Getter private final CountDownLatch countDownLatch;
  @Getter private String data;

  @Override
  public void accept(String data) {
    this.data = data;
    // count to 0 
    countDownLatch.countDown();
  }
}

I read document and all of them say that we should never sleep/block/await on webflux application because it will block the current thread and the thread cannot handle another HTTP request.

But in our case, await is a must. We cannot change the flow above because client is a blackbox. Please suggest a correct way to await in webflux application/controller. thank you

CodePudding user response:

I have not seen a really clean way to get asynchronous-but-non-reactive data into the reactive stream. Your use of countdown latch is as good as any.

However, if you use the ReactiveKafka library, then you won't have to do any of the clumsy blocking.

UPDATE

I actually did find a relatively elegant solution that uses MonoSink.

public Mono<ResponseEntity<byte[]>> post(@RequestBody Mono<byte[]> b) {
    return b.map(String::new)
        .flatMap(data -> Mono.<String>create(sink -> {
          Consumer<String> kafkaListener = sink::success;

          subscribe("topic_abc", String.class, kafkaListener);
          sendAuthenticationRequestToKafka(data);
        })
        .map(this::toResponse);
}

This assumes that subscribe, and sendAuthenticationRequestToKafka are in scope. If you move that to the service layer, you can have that method return Mono<String>.

Also, I don't know how Kafka handles errors, but you could create error handlers that forward those to the sink.

  • Related