Home > Software engineering >  Kafka streams errors after redeploying Tomcat
Kafka streams errors after redeploying Tomcat

Time:02-18

I am using kafka streams in my project. I compile my project as war and run it in tomcat.

My project works as I want without any errors. If I first stop tomcat and then start it, it works without error. However, if I redeploy(undeploy and deploy) the service without stopping tomcat, I start getting errors. When I do research, there is information that tomcat caches the old version of the service. I could not reach a solution even though I applied some solutions. I will be grateful if you could help me.

I want to say it again. My code block works normally. If I run the service for the first time in tomcat, I don't get an error. Or if I close tomcat completely and start it again, I don't get an error. However, if I redeploy(undeploy and deploy) the service without stopping tomcat , I start getting an error.

I am sharing a small code block below.

Properties streamConfiguration = kafkaStreamsConfiguration.createStreamConfiguration(createKTableGroupId(), new AppSerdes.DataWrapperSerde());
StreamsBuilder streamsBuilder = new StreamsBuilder();
KTable<String, DataWrapper> kTableDataWrapper = streamsBuilder.table(topicAction.getTopicName());
KTable<String, DataWrapper> kTableWithStore = kTableDataWrapper.filter((key, dataWrapper) -> key != null && dataWrapper != null, Materialized.as(createStoreName()));

kTableWithStore.toStream().filter((key, dataWrapper) -> // Filter)
        .mapValues((ValueMapperWithKey<String, DataWrapper, Object>) (key, dataWrapper) -> {
            // Logics
        })
        .to(createOutputTopicName());

this.kafkaStreams = new KafkaStreams(streamsBuilder.build(), streamConfiguration);
this.kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    if (kafkaStreams != null) {
        kafkaStreams.close();
    }
}));


public Properties createStreamConfiguration(String appId, Serde serde) {
    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, appId);
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers);
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, serde.getClass());
    properties.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, dynamicKafkaSourceTopologyConfiguration.getkTableCommitIntervalMs());
    properties.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, dynamicKafkaSourceTopologyConfiguration.getkTableMaxByteBufferMB() * 1024 * 1024);
    properties.put(StreamsConfig.STATE_DIR_CONFIG, KafkaStreamsConfigurationConstants.stateStoreLocation);
    return properties;

}

Error :

2022-02-16 14:19:39.663  WARN 9529 --- [     Thread-462] o.a.k.s.p.i.StateDirectory               : Using /tmp directory in the state.dir property can cause failures with writing the checkpoint file due to the fact that this directory can be cleared by the OS
2022-02-16 14:19:39.677 ERROR 9529 --- [     Thread-462] o.a.k.s.p.i.StateDirectory               : Unable to obtain lock as state directory is already locked by another process
2022-02-16 14:19:39.702 ERROR 9529 --- [     Thread-462] f.t.s.c.- Message : Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory - Localized Message : Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory - Print Stack Trace : org.apache.kafka.streams.errors.StreamsException: Unable to initialize state, this can happen if multiple instances of Kafka Streams are running in the same state directory
        at org.apache.kafka.streams.processor.internals.StateDirectory.initializeProcessId(StateDirectory.java:186)
        at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:681)
        at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:657)
        at org.apache.kafka.streams.KafkaStreams.<init>(KafkaStreams.java:567)

CodePudding user response:

I think this is because

Runtime.getRuntime().addShutdownHook(new Thread(() -> {
    if (kafkaStreams != null) {
        kafkaStreams.close();
    }
}));

is not being called during re-deploy, as JVM process continue to run. Please try another way to be notified when your application is being redeployed, for example using ServletContextListener

CodePudding user response:

My problem was solved thanks to @udalmik.

I solved my problem by extending my beans from DisposableBean.

Additionally I have prototype beans. This solution didn't work on my prototype beans. I am writing my solution for both prototype and singleton beans.

// For Singleton Bean
@Service
public class PersonSingletonBean implements DisposableBean {
    @Override
    public void destroy() throws Exception {
        if (kafkaStreams != null) {
            kafkaStreams.close();
        }
    }
}

// For PrototypeBean
@Service
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class PersonPrototypeBean implements DisposableBean {

    @Override
    public void destroy() {
        if (kafkaStreams != null) {
            kafkaStreams.close();
        }
    }

}

@Service
public class PersonPrototypeBeanList implements DisposableBean {

    private final List<PersonPrototypeBean> personPrototypeBeanList = Collections.synchronizedList(new ArrayList<>());

    public void addToPersonPrototypeBeanList(PersonPrototypeBean personPrototypeBean) {
        personPrototypeBeanList.add(personPrototypeBean);
    }

    public void destroy() throws Exception {
        synchronized (personPrototypeBeanList) {
            for (PersonPrototypeBean personPrototypeBean : personPrototypeBeanList) {
                if (personPrototypeBean != null) {
                    ((DisposableBean) personPrototypeBean).destroy();
                }
            }
            personPrototypeBeanList.clear();
        }
    }
}
  • Related