Home > Blockchain >  How to pass Kafka's --producer.config through Java
How to pass Kafka's --producer.config through Java

Time:07-20

I'm using the below command to send records to a secure Kafka

bin/kafka-topics.sh --create --topic <My Kafka topic name> --bootstrap-server <My custom bootstrap server>  --consumer.config /Users/DY/SSL/ssl.properties

As you can see I have added the ssl.properties file's path to the --producer.config switch.

The ssl.properties file contains details about how to connect to secure kafka, its contents are below:

security.protocol=SSL
ssl.truststore.location=<My custom value>
ssl.truststore.password=<My custom value>
ssl.key.password=<My custom value>
ssl.keystore.location=<My custom value>
ssl.keystore.password=<My custom value>

Now, I want to use replicate this command with java producer. The code that I've written is as:

public class MyProducer {
    public static void main(String[] args) {
        {
            Properties properties = new Properties();
            properties.put("bootstrap.servers", <My bootstrap server>);
            properties.put("key.serializer", StringSerializer.class);
            properties.put("value.serializer", StringSerializer.class);
            properties.put("producer.config", "/Users/DY/SSL/ssl.properties");



            KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
                    <My bootstrap server>, "Hello World from program");


            Future<RecordMetadata> future = kafkaProducer.send(
                    producerRecord,
                    (metadata, exception) -> {
                                    if(exception != null){
                                        System.out.printf("some thing wrong");
                                        exception.printStackTrace();
                                    }
                                    else{
                                        System.out.println("Successfully transmitted");
                                    }
                    });

            future.get()

            kafkaProducer.close();

        }
    }
}

This way of passing the properties.put("producer.config", "/Users/DY/SSL/ssl.properties"); however does not seem to work. Could anybody let me know what would be an appropriate way to do this

CodePudding user response:

You have to set each one as a discrete property in the producer Properties.

You could use Properties.load() with a FileInputStream or FileReader to load them from the file into your Properties object.

CodePudding user response:

Rather than use any file to pass the properties individually, you can use static client configs as below;

Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

// for SSL Encryption
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL");
properties.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "<My custom value>");
properties.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "<My custom value>");

// for SSL Authentication
properties.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "<My custom value>");
properties.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "<My custom value>");
properties.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "<My custom value>");

Required classes are;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SslConfigs;
  • Related