I have an issue with entities not being saved by Spring Data. The application logic goes like this:
- Another application is listening to a pretty heavily loaded Kafka topics (tens of messages per second) and inserting messages into table in a database with a 'NEW' status.
- @Scheduled method loads a List of entities with 'NEW' status, these entities are one-by-one transferred to a FixedThreadPool (20 threads), their status is set to 'PROCESSING' and a saveAll method is called upon a same table.
@Scheduled(fixedDelay = 10_000)
public void scheduled() {
Pageable pageable = PageRequest.of(0, pageSize);
List<EntityClass> entities = entityService.getAllByStatusOrderByIdDesc(NEW_STATUS, pageable);
while (!entities.isEmpty()) {
entities.forEach(entity -> {
entity.setStatus(PROCESSING_STATUS);
process(entity);
});
entityService.saveAll(entities);
loggers = entityService.getAllByStatusOrderByIdDesc(NEW_STATUS, pageable);
}
private void process(Entity entity) {
threadPool.execute(
() -> processor.execute(
TaskBuilder.entity(entity).build()
)
);
}
- In a FixedThreadPool's thread some business logic happens through several classes by a TaskProcessor, resulting in status change for every entity into 'OK' or 'ERROR'.
- Last step in business logic is updating an entity with resulted status to the same table it was pooled out of, and that's the place I got a problem: part of entities does not persist and stays in 'PROCESSING' status forever. From the last part of the last step:
...
log.info("[SaveStep] [execute] status before {}", entity.getStatus());
Entity entity = entityService.save(entity);
log.info("[SaveStep] [execute] status after {}", entity.getStatus());
Both log methods are showing correct status ('OK' or 'ERROR'), but database row keeps containing a 'PROCESSING' status. I've tried to redo an entityService.save() with multiple variations (entityRepository is a regular JPARepository):
- regular repo.save() doesn't work:
public Entity save(Entity entity) {
return repository.save(logger);
}
- Get, refresh and save doesn't work:
@Transactional
public Entity save(Entity entity) {
Entity saved = repository.getFirstById(entity.getId());
saved.setStatus(entity.getStatus());
return repository.save(saved);
}
- I've got a thought that a Hikari pool size might be an issue, but increasing it to maximum-pool-size=20 and even 30 didn't make a thing.
- If I surround option 2 with logger, I've got correct status in both logger lines but not in the database:
@Transactional
public Entity save(Entity entity) {
Entity saved = repository.getFirstById(entity.getId());
log.info("[Service] [save] status before {}", saved.getStatus());
saved.setStatus(entity.getStatus());
log.info("[Service] [save] status after {}", saved.getStatus());
return repository.save(saved);
}
- I've tried native query, but no success:
@Modifying
@Transactional
@Query(value = "UPDATE entity_table_name set status = :status where id = :id", nativeQuery = true)
void updateStatus(@Param("id") Long id, @Param("status") String status);
- I've tried replace repo.save() with repo.saveAndFlush() but got no effect.
At the moment I'm out of ideas to try. The main insulting thing is that most of the entities are finally digested correctly by a database, and only about 20% are ignored by a save() without any thrown exceptions.
CodePudding user response:
You're setting the message to "processing" after it's submitted to the pool, meaning the pool may get to it first, and the "processing" update could happen after. Not sure if this is your problem, but it's a possibility with what you're doing. Try setting and saving/committing the "processing" update prior to submitting to the pool.
CodePudding user response:
Thanks for Taylor as he absolutely correctly pointed to my issue. List of entities has been processed in parallel and saved with correct status fast enough even before scheduler hit his saveAll('PROCESSING') line, which causes status rewriting.