Home > Back-end >  How to write custom dataframe to eventhub from ADLS
How to write custom dataframe to eventhub from ADLS

Time:12-20

I would like to write custom data frame to eventhub.

val customDf = spark.read.json("path/to/json")

EventHub ConnectionString
val connectionString = new com.microsoft.azure.eventhubs.ConnectionStringBuilder("Endpoint=sb://test.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxxxxxxxxxxx=").setEventHubName("test")

val ehConf = EventHubsConf(connectionString.toString).setConsumerGroup("testing")
val eventhubSchema = spark.readStream.format("eventhubs").options(ehConf.toMap).option("eventhubs.partition.count", "4").load()

eventhubSchema.printSchema 

will show the default schema of eventhub body

Now I want write the above customDf to eventhub

Method1:
    ds = customDf \
      .selectExpr("partitionKey", "body") \
      .writeStream \
      .format("eventhubs") \
      .options(ehConf.toMap) \
      .option("checkpointLocation", "///output.txt") \
      .start()

Method2:

ds = customDf \          
  .writeStream \
  .format("eventhubs") \
  .options(ehConf.toMap) \
  .option("checkpointLocation", "///output.txt") \
  .start()

How do I write the customDf to eventhub. I even did select(get_json_object(cast to striong type) but I am getting as

org.apache.spark.sql.AnalysisException: cannot resolve 'body' given input columns 

How to write the customDf to eventhub

CodePudding user response:

You need to transform data in your dataframe into a single column object - either binary or string - it's really depends on your consumers. The simplest way to do that is to pack all data as JSON, using the combination of to_json struct functions:

import pyspark.sql.functions as F

stream = customDf \
      .select(F.to_json(F.struct("*")).alias("body")) \
      .writeStream \
      .format("eventhubs") \
      .options(ehConf.toMap) \
      .option("checkpointLocation", "...") \
      .start()
  • Related