Home > Software design >  Why does Spring Cloud Stream with Kafka binder hash keys differently than a standard Kafka Producer?
Why does Spring Cloud Stream with Kafka binder hash keys differently than a standard Kafka Producer?

Time:08-03

I've run into a problem where I need to repartition an existing topic (source) to a new topic (target) with a higher number of partitions (a multiple of the number of previous partitions).

The source topic was written to using Spring Cloud Stream using the Kafka Binder. The target topic is being written to using a KStreams application.

The records in the source topic were being partitioned based on a header, with key=null. I tried to explicitly extract this header and set a message key for records in the target topic, and noticed that records with the same partition key were landing in completely different partitions.

After some investigation, I've found the culprit to be the following:

org.springframework.cloud.stream.binder.PartitionHandler.DefaultPartitionSelector

    private static class DefaultPartitionSelector implements PartitionSelectorStrategy {

        @Override
        public int selectPartition(Object key, int partitionCount) {
            int hashCode = key.hashCode();
            if (hashCode == Integer.MIN_VALUE) {
                hashCode = 0;
            }
            return Math.abs(hashCode);
        }
    }

org.springframework.cloud.stream.binder.PartitionHandler

    public int determinePartition(Message<?> message) {
        // ... non relevant code omitted

        partition = this.partitionSelectorStrategy.selectPartition(key,
                    this.partitionCount);
        // protection in case a user selector returns a negative.
        return Math.abs(partition % this.partitionCount);

While the default Kafka partitioning strategy does:

org.apache.kafka.clients.producer.internals.DefaultPartitioner

    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster,
                         int numPartitions) {
        if (keyBytes == null) {
            return stickyPartitionCache.partition(topic, cluster);
        }
        // hash the keyBytes to choose a partition
        return Utils.toPositive(Utils.murmur2(keyBytes)) % numPartitions;
    }

In essence, using something other than Spring Cloud Stream would never allow co-partitioning with a topic written to by a non Spring Cloud Stream App, unless a custom Partitioner is used (not too difficult to do).

It should be noted, however, that the above DefaultPartitionSelector is not located in the Kafka Binder module, but in the higher-level spring-cloud-stream module.

What is the reasoning for this design choice? I imagine the default partitioner applies to all binders, not just Kafka, but why does the Kafka Binder not implement and its own Partitioner that allows out-of-the-box co-partitioning with non-Spring Cloud Stream apps by default?

CodePudding user response:

As I said in my comment

Partitioning at the binder level is intended for infrastructure that doesn't support partitioning natively; just don't use it and let Kafka do the partitioning itself.

That said, it's not entirely clear what you mean; the spring partitioner was written long ago and predates the sticky cache introduced by KIP 480. But, even that partitioner will change the partition if the number of partitions changes when the app is restarted - if there is a key, it is modded by the number of partitions; if there is no key, a random (sticky) partition is selected.

Run this with 10, then 20, partitions and you will see that.

@SpringBootApplication
public class So73207602Application {

    public static void main(String[] args) {
        SpringApplication.run(So73207602Application.class, args).close();
    }

    @Bean
    ApplicationRunner runner(KafkaTemplate<String, String> template, NewTopic topic, KafkaAdmin admin) {
        return args -> {
            System.out.println(template.send("topic1", "foo", "bar").get().getRecordMetadata());
        };
    }

    @Bean
    public NewTopic topic() {
        return TopicBuilder.name("topic1").partitions(10).replicas(1).build();
    }

}

With a null key you will get a different (random) partition each time.

  • Related