I've got the below piece of code to retrieve XML files, extract some of the tags and save as CSV files. As tag values need to be saved as separate files I'm using foreachbatch
method of df.writeStream
; to extract and save them separately. See below, the environment/version, the code used and the error returned when executed on Azure Databricks.
Environment:
- Databricks Runtime version: 10.4 LTS
- Apache Spark 3.2.1,
- Scala 2.12
- Event hubs library from maven: com.microsoft.azure:azure-eventhubs-spark_2.12:2.3.22
Code:
# Databricks notebook source
import lxml.etree as ET
import pyspark.sql.types as T
from os.path import dirname, join
# Define namespaces found in the xml files to pick elements from the default ("") namespace or specific namespace
namespaces = {
"": "http://www.fpml.org/FpML-5/reporting",
"xsi": "http://www.w3.org/2001/XMLSchema-instance"
}
# trade date **********************************
trade_header = T.StructType([
T.StructField("messageId", T.StringType(), False),
T.StructField("tradeDate", T.StringType(), False)
])
def to_xml_message_trade_date(xml_string):
root = ET.fromstring(xml_string)
messageId = root.find(".//messageId", namespaces).text
tradeDate = root.find(".//tradeDate", namespaces).text
return [messageId, tradeDate]
extract_udf = udf(to_xml_message_trade_date, trade_header)
**********************************************
connectionString = "Endpoint=sb://xxxxxx.servicebus.windows.net/;SharedAccessKeyName=xxxx;SharedAccessKey=xxxxxxx;EntityPath=xxxxx"
ehConf = {
'eventhubs.connectionString' : sc._jvm.org.apache.spark.eventhubs.EventHubsUtils.encrypt(connectionString)
}
stream_data = spark \
.readStream \
.format("eventhubs") \
.options(**ehConf) \
.option('multiLine', True) \
.option('mode', 'PERMISSIVE') \
.load()
df_str = stream_data.withColumn("data", stream_data["body"].cast("string"))
def write2csv(df, epoch_id):
df.persist()
df_tuples = df.select(extract_udf("data").alias("extracted_data"))
df_parsed = df_tuples.select("extracted_data.*")
df_parsed \
.write \
.format("csv") \
.mode(SaveMode.Append) \
.option("header", True) \
.save("dbfs:/FileStore/Incoming/trade_date/")
df.unpersist()
query = df_str \
.writeStream \
.outputMode("append") \
.foreachBatch(write2csv) \
.trigger(processingTime="1 seconds") \
.start()
query.awaitTermination()
Error returned:
StreamingQueryException: An exception was raised by the Python Proxy. Return Message: Traceback (most recent call last):
StreamingQueryException Traceback (most recent call last)
<command-1879221600357983> in <module>
6 .start()
7
----> 8 query.awaitTermination()
9
10 # .format("csv") \
/databricks/spark/python/pyspark/sql/streaming.py in awaitTermination(self, timeout)
101 return self._jsq.awaitTermination(int(timeout * 1000))
102 else:
--> 103 return self._jsq.awaitTermination()
104
105 @property
/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py in __call__(self, *args)
1303 answer = self.gateway_client.send_command(command)
1304 return_value = get_return_value(
-> 1305 answer, self.gateway_client, self.target_id, self.name)
I can normally stream and save tag values in a single file using the below code snippet but the issue occurs when I use foreachbatch
to save tag values in separate files.
df_trade_date \
.writeStream \
.format("csv") \
.trigger(processingTime="30 seconds") \
.option("checkpointLocation", "dbfs:/FileStore/checkpoint/") \
.option("path", "dbfs:/FileStore/Incoming/trade_date/") \
.option("header", True) \
.outputMode("append") \
.start() \
.awaitTermination()
What am I missing here? Are there any suggestions?
CodePudding user response:
Changing write2csv function with below fixed the issue
def write2csv(df, epoch_id):
df.persist()
df_tuples = df.select(extract_udf("data").alias("extracted_data"))
df_parsed = df_tuples.select("extracted_data.*")
df_parsed \
.write \
.format("csv") \
.mode("append") \
.option("header", True) \
.save("dbfs:/FileStore/Incoming/trade_date/")
df.unpersist()
Note .mode("append") \
line where I replaced Savemode.Append
with