Home > Enterprise >  Safe processing data coming from KafkaListener
Safe processing data coming from KafkaListener

Time:11-11

I'm implementing Spring Boot App which reads some data from kafka to provide it for all requesting clients. Let's say I have a following class:

@Component
public class DataProvider {
    
    private Prices prices;
    
    public DataProvider() {
        this.prices = Prices.of();
    }
    
    public Prices getPrices() {
        return prices;
    }
}

Each client may perform GET /api/prices to get info about newest prices. Live updates about prices are consumed from kafka. Due to the fact, that update comes every 5 seconds, which is not super often, the topic has only one partition.

I tried the very basic option using Kafka Listener:

@Component
public class DataProvider {

    private Prices prices;

    public DataProvider() {
        this.prices = Prices.of();
    }

    public Prices getPrices() {
        return prices;
    }

    @KafkaListener(topics = "test-topic")
    public void consume(String message) {
        Prices prices = Prices.of(message);
        this.prices = prices;
    }
}

Is this approach safe?

CodePudding user response:

The prices must be volatile. But again: you need to be sure that an actual data for prices is OK to be dispersed. One HTTP request may return one data, but another concurrent may return other. Just because it has been just update by the Kafka consumer.

You may have your consume() and getPrices() as synchronized. So, every one is going to get an actual data at the same moment. However they are not going to be parallel since synchronized ensures only one thread can get access to the object.

Another way for consistency is to look into a ReadWriteLock barrier. So, getPrices() calls can be parallel, but as long as consume() takes a WriteLock, everyone is blocked until it is done.

So, technically your code is really safe. Only the problem if it is safe from a business purpose.

  • Related