Home > Enterprise >  Spring Cloud Stream Kafka Binder incorrect partitioning
Spring Cloud Stream Kafka Binder incorrect partitioning

Time:12-07

I'm using the org.springframework.cloud:spring-cloud-stream-binder-kafka library and I'm having trouble with partitioning messages in a topic. My topic has 4 partitions but i'm only seeing events in partition 0 i.e. the publisher is not partitioning the event correctly.

When i check the topic partitions (the one that has messages in it), I do see that the message has a proper value for the key field (but it's not being used? idk, i'm a little confused)

I followed the official partitioning example and have the following code:

Producer code

@Component
class FooEventPublisher {
    private val logger = LoggerFactory.getLogger(this::class.java)
    private val mapper = jacksonObjectMapper()
        .findAndRegisterModules()
        .configure(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS, false)

    private val ingressChannel = Channel<FooEvent>(capacity = Channel.UNLIMITED)

    /** other component will call this to pipe in events to be published */
    suspend fun send(event: FooEvent) = ingressChannel.send(event)

    /** helper function to convert [FooEvent] into a [Message] with a JSON payload */
    private fun FooEvent.toMessage(): Message<ByteArray> {
        val payload = mapper.writeValueAsBytes(this)
        val partitionKey = this.name
        val message = MessageBuilder
            .withPayload(payload)
            .setHeader(KafkaHeaders.MESSAGE_KEY, partitionKey.toByteArray())
            .setHeader("partitionKey", partitionKey.toByteArray())
            .build()
        return message
    }

    @Bean
    fun publishFooEvents(): () -> Flux<Message<ByteArray>> = {
        ingressChannel
            .consumeAsFlow()
            .map {
                try {
                    it.toMessage()
                } catch (err: Exception) {
                    logger.error("Skipping event because of encoding failure", err)
                    logger.trace("problematic event=$it")
                    null
                }
            }
            .filterNotNull()
            .asFlux()
    }
}

Relevant Spring Configuration

spring:
  cloud:
    function:
      definition: publishFooEvents
    stream:
      kafka:
        binder:
          brokers: localhost:9092
      bindings:
        publishFooEvents-out-0:
          destination: kf-foo-events-topic
          producer:
            partition-key-expression: headers['partitionKey']

I expected the kafka binder library to use the partitionKey field as the field to partition on e.g. all messages with key 1234 would go to partition 1 and messages with key 5678 would go to partition 2

I'm not sure what i'm missing here? why isn't the binder detecting that the target topic has 4 partitions and using that information to partition?

edit: fixed key in example above

CodePudding user response:

Partitioning at the binder level is not intended for infrastructure that supports partitioning natively, such as Kafka. Just use native Kafka partitioning instead (which by default will be based on the key).

Furthermore, you are setting the header to a byte[]; it should remain as String so that the hash algorithm uses the value; the hash code of byte[] depends on its system identity, not the array contents.

e.g. all messages with key 1234 would go to partition 1 and messages with key 1234 would go to partition 2

That makes no sense, I presume you meant to specify different keys.

  • Related