Home > Enterprise >  Kafka Streams application producing topic with same message
Kafka Streams application producing topic with same message

Time:01-31

I am facing an issue with my Kafka streams application, where messages are being processed multiple times and the result topic is constantly receiving messages. This issue is only present in production and not in my local environment. Can you help me determine the root cause of this problem, based on the transformer code?

@Override
public KeyValue<String, UserClicks> transform(final String user, final Long clicks) {

                UserClicks userClicks = tempStore.get(user);

                if (userClicks != null) {
                    userClicks.clicks  = clicks;
                }
                else {
                    final String region = regionStore.get(user).value(); // TODO: géré le cas où la donnée n'est pas dans le store
                    userClicks = new UserClicks(user, region, clicks);
                }

                if (userClicks.clicks < CLICKS_THRESHOLD) {
                    tempStore.put(user, userClicks);
                }
                else {
                    tempStore.delete(user);
                }

                return KeyValue.pair(user, userClicks);
            }

`

When I remove KStore from transformer everything seems to work fine.

CodePudding user response:

Usally this problem occures becuase kafka can’t save its state, and it’s reading the same batch of messages. KStore stores it’s state on change log topic, and it stores it by producing messages. If the produces can’t produce for some reson, new offset can never be commited.

To resolve the issue, change the minimum number of in-sync replicas to 1 or set the replication factor to 2. By default, Kafka streams creates a replication factor of 1. Easy way to configure this is through Conduktor, just go to topic config and changes min.insync.replicas property enter image description here It cant also be done through kafka CLI by running this command.

kafka-configs.sh --bootstrap-server localhost:9092 --alter --entity-type topics --entity-name configured-topic min.insync.replicas 1
  • Related