Home > Blockchain >  Is it possible to create a multi-binder binding with Spring-Cloud-Streams Kafka-Streams to stream fr
Is it possible to create a multi-binder binding with Spring-Cloud-Streams Kafka-Streams to stream fr

Time:08-13

I want to create a Kafka-Streams application with Spring-Cloud-Streams which integrates 2 different Kafka Clusters / setups. I tried to implement it using multi-binder configurations as mentioned in the documentation and similar to the examples here: https://github.com/spring-cloud/spring-cloud-stream-samples/tree/main/multi-binder-samples

Given a simple function like this:

    @Bean
public Function<KStream<String, AnalyticsEvent>, KStream<String, UpdateEvent>> analyticsEventProcessor() {
    return input -> input
            .filter(new AnalyticsPredicate())
            .map(new AnalyticsToUpdateEventMapper());
}

In the configuration i'm trying to bind these to different binders.

spring.cloud:
  stream:
    bindings:
      analyticsEventProcessor-in-0:
        destination: analytics-events
        binder: cluster1-kstream
      analyticsEventProcessor-out-0:
        destination: update-events
        binder: cluster2-kstream

binders:
  cluster1-kstream:
    type: kstream
    environment:
      spring:
        cloud:
          stream:
            kafka:
              binder:
                brokers: <url cluster1>:9093
                configuration:
                  security.protocol: SSL
                  schema.registry.url: <schema-registry-url-cluster1>
                  schema.registry.ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
                  schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
                  schema.registry.ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
                  schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
                  ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
                  ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
                  ssl.truststore.type: JKS
                  ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
                  ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
                  ssl.keystore.type: JKS
                  ssl.enabled.protocols: TLSv1.2
              streams:
                binder:
                  brokers: <url cluster1>:9093
                  configuration:
                    security.protocol: SSL
                    schema.registry.url: <schema-registry-url-cluster1>
                    schema.registry.ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
                    schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
                    schema.registry.ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
                    schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD} 
                    ssl.truststore.location: /mnt/secrets/cluster1/truststore.jks
                    ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER1_TRUST-STORE-PASSWORD}
                    ssl.truststore.type: JKS
                    ssl.keystore.location: /mnt/secrets/cluster1/keystore.jks
                    ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER1_KEY-STORE-PASSWORD}
                    ssl.keystore.type: JKS
                    ssl.enabled.protocols: TLSv1.2
  cluster2-kstream:
    type: kstream
    environment:
      spring:
        cloud:
          stream:
            kafka:
              binder:
                brokers: <url cluster2>:9093
                configuration:
                  security.protocol: SSL
                  schema.registry.url: <schema-registry-url-cluster2>
                  schema.registry.ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
                  schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
                  schema.registry.ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
                  schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
                  ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
                  ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
                  ssl.truststore.type: JKS
                  ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
                  ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
                  ssl.keystore.type: JKS
                  ssl.enabled.protocols: TLSv1.2
              streams:
                binder:
                  brokers: <url cluster2>:9093
                  configuration:
                    security.protocol: SSL
                    schema.registry.url: <schema-registry-url-cluster2>
                    schema.registry.ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
                    schema.registry.ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
                    schema.registry.ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
                    schema.registry.ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD} 
                    ssl.truststore.location: /mnt/secrets/cluster2/truststore.jks
                    ssl.truststore.password: ${SPRING_KAFKA_SSL_CLUSTER2_TRUST-STORE-PASSWORD}
                    ssl.truststore.type: JKS
                    ssl.keystore.location: /mnt/secrets/cluster2/keystore.jks
                    ssl.keystore.password: ${SPRING_KAFKA_SSL_CLUSTER2_KEY-STORE-PASSWORD}
                    ssl.keystore.type: JKS
                    ssl.enabled.protocols: TLSv1.2

I tried first to run the application completely in a single cluster which worked well. When i run this i always get an error:

2022-08-10 15:28:42.892  WARN 1 --- [-StreamThread-2] org.apache.kafka.clients.NetworkClient   : [Consumer clientId=<clientid>-StreamThread-2-consumer, groupId=<group-id>] Error while fetching metadata with correlation id 2 : {analytics-events=TOPIC_AUTHORIZATION_FAILED}
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] org.apache.kafka.clients.Metadata        : [Consumer clientId=<client-id>, groupId=<group-id>] Topic authorization failed for topics [analytics-events]
2022-08-10 15:28:42.893  INFO 1 --- [-StreamThread-2] org.apache.kafka.clients.Metadata        : [Consumer clientId=<client-id>, groupId=<group-id>] Cluster ID: <cluster-id>
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] c.s.a.a.e.UncaughtExceptionHandler       : org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]
2022-08-10 15:28:42.893 ERROR 1 --- [-StreamThread-2] org.apache.kafka.streams.KafkaStreams    : stream-client [<client-id>] Replacing thread in the streams uncaught exception handler

org.apache.kafka.streams.errors.StreamsException: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:642) ~[kafka-streams-3.1.1.jar!/:na]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:576) ~[kafka-streams-3.1.1.jar!/:na]
Caused by: org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [analytics-events]

I verified the kafka-client certificates they should be correct. I looked at them with keytool, also the password env is set correctly. The consumerConfig also uses the correct broker URL.

Is it possible to use within a KStream Function different kafka clusters with multi-binder for the input for a stream and for the output, is this possible or does it only work with type kafka binders?

CodePudding user response:

In Kafka Streams, you cannot connect to two different clusters in a single application. This means that you cannot receive from a cluster on the inbound and write to another cluster on the outbound when using a Spring Cloud Stream function. See this SO [thread][1] for more details.

You can probably receive from and write to the same cluster in your Kafka Streams function as a workaround. Then, using a regular Kafka binder-based function, simply bridge the output topic to the second cluster. In regular functions (non-Kafka Streams), it can consume from and publish to multiple clusters.

@Bean
public Function<KStream<String, AnalyticsEvent>, KStream<String, UpdateEvent>> analyticsEventProcessor() {
    return input -> input
            .filter(new AnalyticsPredicate())
            .map(new AnalyticsToUpdateEventMapper());
}

This function needs to receive and write to the same cluster. Then you can have another function as below.

@Bean
public Function<?, ?> bridgeFunction() {
    ....
}

For this function, input is cluster-1 and output is cluster-2.

When using this workaround, make sure to include the regular Kafka binder also as a dependency - spring-cloud-stream-binder-kafka.

Keep in mind that there are disadvantages to this approach, such as adding an extra topic overhead, latency from that etc. However, this is a potential workaround for this use case. For more options, see the SO thread, I mentioned above.



  [1]: https://stackoverflow.com/questions/45847690/how-to-connect-to-multiple-clusters-in-a-single-kafka-streams-application
  • Related