Home > Enterprise >  Cannot save more than 256 records R2DBC: Spring WebFlux Cannot exchange messages because the request
Cannot save more than 256 records R2DBC: Spring WebFlux Cannot exchange messages because the request

Time:06-21

I have below code which inserts data into postgres database:

if (employeeList == null || employeeList.isEmpty())
            return Flux.empty();

        return Flux.fromIterable(employeeList)
                .doOnNext(employee -> {
                    employee.setParentResourceId(parentResourceId);
                    employee.setCreatedBy(userId);
                })
                .map(employeeMapper::entityFromModel)
                .flatMap(employeeRepository::save)
                .map(employeeMapper::fromEntity); 

Above code works fine for employeeList <= 256, when I try to insert more than 256 records I get Cannot exchange messages because the request queue limit is exceeded exception

I have researched quite abit and replaced above code with below code:

Flux<Employee> employeeFlux = Flux.create(emitter -> {
            for (Employee employee : employeeList) {
                emitter.next(employee);
            }
        }, FluxSink.OverflowStrategy.BUFFER);
        return employeeFlux 
                .doOnNext(employee -> {
                    employee.setParentResourceId(parentResourceId);
                    employee.setCreatedBy(userId);
                })
                .map(employeeMapper::entityFromModel)
                .flatMap(employeeRepository::save)
                .map(employeeMapper::fromEntity);

This change didn't help, I'm facing same exception.

Is there any other strategy to handle this situation in WebFlux. I want the code to be reactive.

CodePudding user response:

Instead of flatMap which works concurrently you should use concatMap which works on a one to one basis. See this issue which explains that in a bit more detail.

return Flux.fromIterable(employeeList)
            .doOnNext(employee -> {
                employee.setParentResourceId(parentResourceId);
                employee.setCreatedBy(userId);
            })
            .map(employeeMapper::entityFromModel)
            .concatMap(employeeRepository::save)
            .map(employeeMapper::fromEntity); 
  • Related