I wrote a Spring Cloud Streams Kafka Streams Binder application that has multiple Kafka input topics multiplexed to one stream with:
spring:
cloud:
stream:
bindings:
process-in-0:
destination: test.topic-a,test.topic-b
But whenever I set up more than one topic in the input destination (separated by comma), the following error occurs:
2022-06-17 14:07:07.648 INFO --- [-StreamThread-1] o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Subscribed to topic(s): test-processor-KTABLE-AGGREGATE-STATE-STORE-0000000005-repartition, test.topic-a,test.topic-b
2022-06-17 14:07:07.660 WARN --- [-StreamThread-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Error while fetching metadata with correlation id 2 : {test-processor-KTABLE-AGGREGATE-STATE-STORE-0000000005-repartition=UNKNOWN_TOPIC_OR_PARTITION, test.topic-a,test.topic-b=INVALID_TOPIC_EXCEPTION}
2022-06-17 14:07:07.660 ERROR --- [-StreamThread-1] org.apache.kafka.clients.Metadata : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Metadata response reported invalid topics [test.topic-a,test.topic-b]
2022-06-17 14:07:07.660 INFO --- [-StreamThread-1] org.apache.kafka.clients.Metadata : [Consumer clientId=test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549-StreamThread-1-consumer, groupId=test-processor] Cluster ID: XYZ
2022-06-17 14:07:07.663 ERROR --- [-StreamThread-1] org.apache.kafka.streams.KafkaStreams : stream-client [test-processor-2ba8d1d3-5bbe-45d3-a832-6a24cf2f5549] Encountered the following exception during processing and Kafka Streams opted to SHUTDOWN_CLIENT. The streams client is going to shut down now.
org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [test.topic-a,test.topic-b]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:627) ~[kafka-streams-3.2.0.jar:na]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:551) ~[kafka-streams-3.2.0.jar:na]
Caused by: org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [test.topic-a,test.topic-b]
I tried with the following dependencies:
implementation 'org.apache.kafka:kafka-clients:3.2.0'
implementation 'org.apache.kafka:kafka-streams:3.2.0'
implementation "org.springframework.cloud:spring-cloud-stream"
implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka"
implementation "org.springframework.cloud:spring-cloud-stream-binder-kafka-streams"
implementation "org.springframework.kafka:spring-kafka"
When I only set one input topic, everything works fine.
I am not able to determine what causes the InvalidTopicException, because I only use permitted characters in topic names and also the comma separator seems correct (else different exceptions occur).
CodePudding user response:
Actually right after posting the question I found out the/one solution/workaround myself. So here it is for future help:
Apparently, I am not allowed to multiplex input topics when my processor topology expects a KTable
as input type. When I change the processor signature to KStream
, it suddenly works:
Not working:
@Bean
public Function<KTable<String, Object>, KStream<String, Object>> process() {
return stringObjectKTable ->
stringObjectKTable
.mapValues(...
Working:
@Bean
public Function<KStream<String, Object>, KStream<String, Object>> process() {
return stringObjectKStream ->
stringObjectKStream
.toTable()
.mapValues(...
I am not sure, if this is expected behaviour or if there is something else wrong, so I appreciate any hints, if there is more underlying.