Home > database >  Spark Streaming Writing Data To Kafka Topic
Spark Streaming Writing Data To Kafka Topic

Time:03-22

I am trying to write a data frame to Kafka topic inside for each RDD. I am using below code:

 mesg.foreachRDD(rdd => { Dataframe.write.format("kafka")
    .option("kafka.bootstrap.servers","host")
    .option("subscribe","topic")
    .option("principal","Kerberos-principal")
    .option("keytab","kerberos-keytab")
    .save()
    })

enter code here

I am getting null pointer exception. Specifically I need to write a data frame to Kafka Topic. Can anyone help on this. Note Dataframe here is obtained after converting rdd to dataframe and removing some fields from input json sent to Kafka Topic.

Exception in thread "main" java.lang.NullPointerException at java.util.regex.Matcher.getTextLength(Matcher.java:1283) at java.util.regex.Matcher.reset(Matcher.java309) at java.util.regex.Matcher.<init>(Matcher.java:229) at java.util.regex.Pattern.matcher(Pattern.java:1093)

CodePudding user response:

The null pointer exception was due to config error which has been resolved. For wrting dataframe to kafka topic from RDD please follow the below approach:

import sparkSession.implicts._

val df = Original Dataframe.select(col("one column name"),to_json(struct($"*"))).toDF("key","value")

df.write.format("kafka").option("bootstrap-server-properties",value from config).option("topic",value from config).save()

CodePudding user response:

Note: If you want to avoid hard coding of column name in select statement then follow this approach:

val df = Original Dataframe.select(to_json(struct($"*"))).as("value").selectExpr("CAST(value as STRING)")

df.write.format("kafka").option("bootstrap-server-properties",value from config).option("topic",value from config).save()

  • Related