I have below code which inserts data into postgres database:
if (employeeList == null || employeeList.isEmpty())
return Flux.empty();
return Flux.fromIterable(employeeList)
.doOnNext(employee -> {
employee.setParentResourceId(parentResourceId);
employee.setCreatedBy(userId);
})
.map(employeeMapper::entityFromModel)
.flatMap(employeeRepository::save)
.map(employeeMapper::fromEntity);
Above code works fine for employeeList <= 256
, when I try to insert more than 256
records I get
Cannot exchange messages because the request queue limit is exceeded
exception
I have researched quite abit and replaced above code with below code:
Flux<Employee> employeeFlux = Flux.create(emitter -> {
for (Employee employee : employeeList) {
emitter.next(employee);
}
}, FluxSink.OverflowStrategy.BUFFER);
return employeeFlux
.doOnNext(employee -> {
employee.setParentResourceId(parentResourceId);
employee.setCreatedBy(userId);
})
.map(employeeMapper::entityFromModel)
.flatMap(employeeRepository::save)
.map(employeeMapper::fromEntity);
This change didn't help, I'm facing same exception.
Is there any other strategy to handle this situation in WebFlux. I want the code to be reactive.
CodePudding user response:
Instead of flatMap
which works concurrently you should use concatMap
which works on a one to one basis. See this issue which explains that in a bit more detail.
return Flux.fromIterable(employeeList)
.doOnNext(employee -> {
employee.setParentResourceId(parentResourceId);
employee.setCreatedBy(userId);
})
.map(employeeMapper::entityFromModel)
.concatMap(employeeRepository::save)
.map(employeeMapper::fromEntity);