I need to read data from the database, send them to Kafka
, and then delete those data (which were successfully sent) from the database. I would think to do it straitforward:
public void syncData() {
List<T> data = repository.findAll();
data.forEach(value -> kafkaTemplate.send(topicName, value));
repository.deleteAll(data);
}
But I have never worked with Kafka before and I have a confusion with kafkaTemplate.send
operation. As the method returns ListenableFuture
that means that the iteration data.forEach
might be finished before all the messages are really sent to a broker. Thus, I might delete the data before they are really sent. What if, for some reason, some messages are not sent. Say I have 10 data, and starting from 7th the broker gets down.
- Will
Kafka
throw an exception if a message is not sent? - Should I introduce an extra logic to ensure that all messages are sent before going to the next stage of deleting the data?
P.S. I use Kafka
with Spring-boot
CodePudding user response:
One of the strategies would be to keep those Future objects that are returned and monitor them (possibly in a separate thread). Once all of the tasks complete you can either delete the records that were successfully sent or write the IDs that need to be deleted in DB. And then have a scheduled task (once per hour or day or whatever period that fits you) that would delete all the ids that should be deleted.
CodePudding user response:
- You should implement a callback that will trigger when the producer either succeeds or fails to write the data to Kafka before deleting it from the DB.
https://docs.spring.io/spring-kafka/docs/1.0.0.M2/reference/html/_reference.html
On top of this, you can set required acks to ALL so that every broker acknowledges the messages before it's considered sent.
Also little tid bit worth knowing in this context -
Acks=ALL
is not all assigned replicas, it's all in sync replicas for the partition need to acknowledge the write. So, it's important to have your min isr settings sensible for this also. If you have min isr = one, in a very strict sense Acks=all is still only guaranteeing that 1 broker saw the write. If you then lose that one broker, you lose the write. That's obviously not going to be a common situation, but it's one that you should be aware of.The usage of
outbox
pattern. (as the safe way of doing this)Also there's some directions that might be helpful are, investigate how the replication factor of a topic relays to the amount of brokers. Get in touch with the
min.insync.replicas
broker setting. Then read on theack-setting
for theclient-(producer)
and what it means in terms of communication with the broker. For restarting at the correct data position when something bad happens to your application or database connection, you can get some inspiration from thekafka-connect
library (and maybe use this as a separately deployeddb-polling-service
).