Home > Mobile >  How to send data to particular Kafka topic partition?
How to send data to particular Kafka topic partition?

Time:07-24

I have this simple producer and the topic has 2 partitions. I want to send first ten messages to partition 0 and the remaining ten messages to the other partition. Is there any way possible ?

package com.company;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class Main {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "localhost:9092");
        properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<Integer, String> producer = new KafkaProducer<>(properties);

        for (int i = 0; i <= 20; i  ) {
            ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("DemoTopic", i, "Test Message"   i);
            producer.send(producerRecord);
        }
        producer.close();
    }
}

CodePudding user response:

You simply use a different constructor that actually accepts the partition

int count = 0;
ProducerRecord<Integer, String> producerRecord;
for (int i = 0; i <= 20; i  ) {
   int partition = count < 10 ? 0 : 1;
   producerRecord = new ProducerRecord("topic", partition, i, "Message "   i);
   ... 
   count  ;
} 

CodePudding user response:

You will have to write a custom partitioner!

For example, this sends even numbers to partition 0 and odds to partition 1.

import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
 
public class KafkaIntegerPartitioner implements Partitioner {

 
  @Override
  public void configure(Map<String, ?> configs) {
 
  }
 
  @Override
  public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
     int partition = 0;
    Integer keyVal = (Integer)key;
    //<logic for partition>
    int divident = keyVal/10;
    if(divident%2==0){
        return 0;
    }
    return 1;
  }
 
  @Override
  public void close() {
 
  }
} 

and then in the producer

properties.put("partitioner.class","KafkaIntegerPartitioner");
  • Related