Home > Blockchain >  Flux.just with delayElements works with Strings but not with Custom types
Flux.just with delayElements works with Strings but not with Custom types

Time:10-09

I want to build a reactive REST endpoint with API returning Employee data(5 employees) with delay of 2 seconds each.

@RequestMapping(value = "/strings", method = RequestMethod.GET)
    public Flux<Employee> getStringsStream() {
        return Flux.just(new Employee("Alok"),
                new Employee("Ashish"),
                new Employee("Neeraj"),
                new Employee("Shantanu"),
                new Employee("Gaurav")).delayElements(Duration.ofSeconds(2));
    }
}

record Employee(String name) {}

It behaves like blocking API and takes total 2*5 = 10 seconds, and renders whole data at once. While it works fine (proper reactively) if I just return Employee names (String) data. Data get rendered at 2 seconds for each name sequentially.

@RequestMapping(value = "/strings", method = RequestMethod.GET)
    public Flux<String> getStringsStream() {
        return Flux.just("Alok",
                "Ashish",
                "Neeraj"
                ,"Shantanu",
                "Gaurav").delayElements(Duration.ofSeconds(2));
    }

I am new to reactive programming. Am I missing any concept here?

CodePudding user response:

Different return types use different response encoders in Spring Webflux.

While Flux<String> will trigger CharSequenceEncoder, a Flux of custom Java objects like Employee will trigger Jackson2JsonEncoder. This results in different behaviour for the different return types.

As documented in Spring Webflux reference:

The Jackson2Encoder works as follows:

  • For a single value publisher (e.g. Mono), simply serialize it through the ObjectMapper.
  • For a multi-value publisher with application/json, by default collect the values with Flux#collectToList() and then serialize the resulting collection.
  • For a multi-value publisher with a streaming media type such as application/x-ndjson or application/stream x-jackson-smile, encode, write, and flush each value individually using a line-delimited JSON format. Other streaming media types may be registered with the encoder.
  • For SSE the Jackson2Encoder is invoked per event and the output is flushed to ensure delivery without delay.

As you can see the default behaviour is serializing it as a JSON array. However, you can customize the behaviour by specifying the produces attribute in the request mapping annotation. The following for example will achieve your desired behaviour:

@GetMapping(value = "/json-stream", produces = TEXT_EVENT_STREAM_VALUE)
public Flux<Employee> getJsonStream() {
  • Related