Home > Back-end >  How to solve InvalidTopicException with multiplexed input topics in Spring Cloud Stream Kafka Stream
How to solve InvalidTopicException with multiplexed input topics in Spring Cloud Stream Kafka Stream

Time:06-21

I wrote a Spring Cloud Streams Kafka Streams Binder application that has multiple Kafka input topics multiplexed to one stream with:

spring:
  cloud:
    stream:
      bindings:
        process-in-0:
          destination: test.topic-a,test.topic-b

(Source: https://spring.io/blog/2019/12/03/stream-processing-with-spring-cloud-stream-and-apache-kafka-streams-part-2-programming-model-continued)

But whenever I set up more than one topic in the input destination (separated by comma), the following error occurs:

2022-06-17 14:07:07.648  INFO --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer     : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Subscribed to topic(s): test-processor-KTABLE-AGGREGATE-STATE-STORE-0000000005-repartition, test.topic-a,test.topic-b  
2022-06-17 14:07:07.660  WARN --- [-StreamThread-1] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Error while fetching metadata with correlation id 2 : {test-processor-KTABLE-AGGREGATE-STATE-STORE-0000000005-repartition=UNKNOWN_TOPIC_OR_PARTITION, test.topic-a,test.topic-b=INVALID_TOPIC_EXCEPTION}  
2022-06-17 14:07:07.660 ERROR --- [-StreamThread-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Metadata response reported invalid topics [test.topic-a,test.topic-b]  
2022-06-17 14:07:07.660  INFO --- [-StreamThread-1] org.apache.kafka.clients.Metadata        : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Cluster ID: XYZ 
2022-06-17 14:07:07.663 ERROR --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams    : stream-client [test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.   

org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [test.topic-a,test.topic-b]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627) ~[kafka-streams-3.2.0.jar:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551) ~[kafka-streams-3.2.0.jar:na]
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [test.topic-a,test.topic-b]

I tried with the following dependencies:

implementation 'org.apache.kafka:kafka-clients:3.2.0'
implementation 'org.apache.kafka:kafka-streams:3.2.0'
implementation "org.springframework.cloud:spring-cloud-stream"
implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka"
implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka-streams"
implementation "org.springframework.kafka:spring-kafka"

When I only set one input topic, everything works fine.

I am not able to determine what causes the InvalidTopicException, because I only use permitted characters in topic names and also the comma separator seems correct (else different exceptions occur).

CodePudding user response:

Actually right after posting the question I found out the/one solution/workaround myself. So here it is for future help:

Apparently, I am not allowed to multiplex input topics when my processor topology expects a KTable as input type. When I change the processor signature to KStream, it suddenly works:

Not working:

@Bean
public Function<KTable<String, Object>, KStream<String, Object>> process() {
  return stringObjectKTable ->
    stringObjectKTable
      .mapValues(...

Working:

@Bean
public Function<KStream<String, Object>, KStream<String, Object>> process() {
  return stringObjectKStream ->
    stringObjectKStream 
      .toTable()
      .mapValues(...

I am not sure, if this is expected behaviour or if there is something else wrong, so I appreciate any hints, if there is more underlying.

  • Related