I have this simple producer and the topic has 2 partitions. I want to send first ten messages to partition 0 and the remaining ten messages to the other partition. Is there any way possible ?
package com.company;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;
public class Main {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put("bootstrap.servers", "localhost:9092");
properties.put("key.serializer", "org.apache.kafka.common.serialization.IntegerSerializer");
properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<Integer, String> producer = new KafkaProducer<>(properties);
for (int i = 0; i <= 20; i ) {
ProducerRecord<Integer, String> producerRecord = new ProducerRecord<>("DemoTopic", i, "Test Message" i);
producer.send(producerRecord);
}
producer.close();
}
}
CodePudding user response:
You simply use a different constructor that actually accepts the partition
int count = 0;
ProducerRecord<Integer, String> producerRecord;
for (int i = 0; i <= 20; i ) {
int partition = count < 10 ? 0 : 1;
producerRecord = new ProducerRecord("topic", partition, i, "Message " i);
...
count ;
}
CodePudding user response:
You will have to write a custom partitioner!
For example, this sends even numbers to partition 0 and odds to partition 1.
import java.util.Map;
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
public class KafkaIntegerPartitioner implements Partitioner {
@Override
public void configure(Map<String, ?> configs) {
}
@Override
public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
int partition = 0;
Integer keyVal = (Integer)key;
//<logic for partition>
int divident = keyVal/10;
if(divident%2==0){
return 0;
}
return 1;
}
@Override
public void close() {
}
}
and then in the producer
properties.put("partitioner.class","KafkaIntegerPartitioner");