Home > Software engineering >  Python Proxy Error when streaming XML files from Azure Event Hub using Databricks
Python Proxy Error when streaming XML files from Azure Event Hub using Databricks

Time:07-02

I've got the below piece of code to retrieve XML files, extract some of the tags and save as CSV files. As tag values need to be saved as separate files I'm using foreachbatch method of df.writeStream; to extract and save them separately. See below, the environment/version, the code used and the error returned when executed on Azure Databricks.

Environment:

  • Databricks Runtime version: 10.4 LTS
  • Apache Spark 3.2.1,
  • Scala 2.12
  • Event hubs library from maven: com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22

Code:

# Databricks notebook source
import lxml.etree as ET
import pyspark.sql.types as T
from os.path import dirname, join


# Define namespaces found in the xml files to pick elements from the default ("") namespace or specific namespace
namespaces = {
    "": "http://www.fpml.org/FpML-5/reporting",
    "xsi": "http://www.w3.org/2001/XMLSchema-instance"
}

# trade date **********************************
trade_header = T.StructType([
    T.StructField("messageId", T.StringType(), False),
    T.StructField("tradeDate", T.StringType(), False)
])


def to_xml_message_trade_date(xml_string):
    root = ET.fromstring(xml_string)
    messageId = root.find(".//messageId", namespaces).text
    tradeDate = root.find(".//tradeDate", namespaces).text
    return [messageId, tradeDate]

extract_udf = udf(to_xml_message_trade_date, trade_header)
**********************************************

connectionString = "Endpoint=sb://xxxxxx.servicebus.windows.net/;SharedAccessKeyName=xxxx;SharedAccessKey=xxxxxxx;EntityPath=xxxxx"

ehConf = {
  'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
}

stream_data = spark \
  .readStream \
  .format("eventhubs") \
  .options(**ehConf) \
  .option('multiLine', True) \
  .option('mode', 'PERMISSIVE') \
  .load()

df_str = stream_data.withColumn("data", stream_data["body"].cast("string"))


def write2csv(df, epoch_id):
     df.persist()
     df_tuples = df.select(extract_udf("data").alias("extracted_data"))
     df_parsed = df_tuples.select("extracted_data.*")

     df_parsed \
       .write \
       .format("csv") \
       .mode(SaveMode.Append) \
       .option("header", True) \
       .save("dbfs:/FileStore/Incoming/trade_date/")

     df.unpersist()
 

query = df_str \
    .writeStream \
    .outputMode("append") \
    .foreachBatch(write2csv) \
    .trigger(processingTime="1 seconds") \
    .start()

query.awaitTermination()

Error returned:

StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):

StreamingQueryException                   Traceback (most recent call last)
<command-1879221600357983> in <module>
      6     .start()
      7 
----> 8 query.awaitTermination()
      9 
     10 #     .format("csv") \

/databricks/spark/python/pyspark/sql/streaming.py in awaitTermination(self, timeout)
    101             return self._jsq.awaitTermination(int(timeout * 1000))
    102         else:
--> 103             return self._jsq.awaitTermination()
    104 
    105     @property

/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1303         answer = self.gateway_client.send_command(command)
   1304         return_value = get_return_value(
-> 1305             answer, self.gateway_client, self.target_id, self.name) 

I can normally stream and save tag values in a single file using the below code snippet but the issue occurs when I use foreachbatch to save tag values in separate files.

df_trade_date \
  .writeStream \
  .format("csv") \
  .trigger(processingTime="30 seconds") \
  .option("checkpointLocation", "dbfs:/FileStore/checkpoint/") \
  .option("path", "dbfs:/FileStore/Incoming/trade_date/") \
  .option("header", True) \
  .outputMode("append") \
  .start() \
  .awaitTermination()

What am I missing here? Are there any suggestions?

CodePudding user response:

Changing write2csv function with below fixed the issue

def write2csv(df, epoch_id):
 df.persist()
 df_tuples = df.select(extract_udf("data").alias("extracted_data"))
 df_parsed = df_tuples.select("extracted_data.*")

 df_parsed \
   .write \
   .format("csv") \
   .mode("append") \
   .option("header", True) \
   .save("dbfs:/FileStore/Incoming/trade_date/")

 df.unpersist()

Note .mode("append") \ line where I replaced Savemode.Append with

  • Related