I am trying to parse multiple xml files with pyspark. All xml files have the same known schema.
First I load all the files as text to spark DF:
path = 'c://path/to/xml/files/*.xml
df = spark.read.text(path)
At this point my DF looks like this:
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
| value
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
|<Msg><Header><tag1>some str1</tag1><tag2>2</tag2><tag3>2022-02-16 10:39:26.730</tag3></Header><Body><Pair><N>N1</N><V>V1</V></Pair><Pair><N>N2</N><V>V2</V></Pair><Pair><N>N3</N><V>V3</V></Pair></Body></Msg>|
|<Msg><Header><tag1>some str2</tag1><tag2>5</tag2><tag3>2022-02-17 10:39:26.730</tag3></Header><Body><Pair><N>N4</N><V>V4</V></Pair><Pair><N>N5</N><V>V5</V></Pair></Body></Msg>|
--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
xml file schema is as follwoing:
df.printSchema()
root
|-- Header: struct (nullable = false)
| |-- tag1: string (nullable = false)
| |-- tag2: integer (nullable = false)
| |-- tag3: timestamp (nullable = false)
|-- Body: struct (nullable = false)
| |-- Pair: array (nullable = false)
| | |-- element: struct (containsNull = true)
| | | |-- N: string (nullable = false)
| | | |-- V: string (nullable = false)
So the final output after parsing should look like this:
--------- ----- ------------------------ --- --
|tag1 | tag2| tag3 | N |V |
--------- ----- ------------------------ --- --
|some str1| 2 |2022-02-16 10:39:26.730 |N1 |V1|
|some str1| 2 |2022-02-16 10:39:26.730 |N2 |V2|
|some str1| 2 |2022-02-16 10:39:26.730 |N3 |V3|
|some str2| 5 |2022-02-17 10:39:26.730 |N4 |V4|
|some str2| 5 |2022-02-17 10:39:26.730 |N5 |V5|
--------- ----- ------------------------ --- --
Meaning "Header" element should repeat itself for all NV pairs that come from the same xml string.
So I think I found a way to extract all the header tags with xpath
or xml.etree.ElementTree
but my problem is that I don't really understand how to extract my NV pairs to something that I can explode later.
What am I missing?
CodePudding user response:
Give a try to:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
df = spark.read.format('xml').options(rowTag='book').load('books.xml')
See https://github.com/databricks/spark-xml#python-api
CodePudding user response:
Depending on your spark version, you have to add this to the environment. I am using spark 2.4.0, and this version worked for me. databricks xml version
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.7.0 pyspark-shell'
The input_path.xml looks like the following:
<Msg><Header><tag1>some str1</tag1><tag2>2</tag2><tag3>2022-02-16 10:39:26.730</tag3></Header><Body><Pair><N>N1</N><V>V1</V></Pair><Pair><N>N2</N><V>V2</V></Pair><Pair><N>N3</N><V>V3</V></Pair></Body></Msg>
<Msg><Header><tag1>some str2</tag1><tag2>5</tag2><tag3>2022-02-17 10:39:26.730</tag3></Header><Body><Pair><N>N4</N><V>V4</V></Pair><Pair><N>N5</N><V>V5</V></Pair></Body></Msg>
input_path = 'src/input/input.xml'
xmlDF = spark.read.format('xml').option('rowTag', 'Msg').load(input_path)
xmlDF.printSchema()
root
|-- Body: struct (nullable = true)
| |-- Pair: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- N: string (nullable = true)
| | | |-- V: string (nullable = true)
|-- Header: struct (nullable = true)
| |-- tag1: string (nullable = true)
| |-- tag2: long (nullable = true)
| |-- tag3: timestamp (nullable = true)
Since you can't explode 2 lists in the same query, you can divide it that way:
xmlDF.select(
'*',
explode("Body.Pair.N").alias('N')
).select(
'N',
explode("Body.Pair.V").alias('V'),
col("Header.tag1").alias('tag1'),
col("Header.tag2").alias('tag2'),
col("Header.tag3").alias('tag3'),
) \
.dropDuplicates() \
.show(truncate=False)
It will give the following result based on your input:
--- --- --------- ---- ----------------------
|N |V |tag1 |tag2|tag3 |
--- --- --------- ---- ----------------------
|N2 |V1 |some str1|2 |2022-02-16 10:39:26.73|
|N4 |V5 |some str2|5 |2022-02-17 10:39:26.73|
|N1 |V3 |some str1|2 |2022-02-16 10:39:26.73|
|N5 |V5 |some str2|5 |2022-02-17 10:39:26.73|
|N5 |V4 |some str2|5 |2022-02-17 10:39:26.73|
|N4 |V4 |some str2|5 |2022-02-17 10:39:26.73|
|N1 |V1 |some str1|2 |2022-02-16 10:39:26.73|
|N3 |V3 |some str1|2 |2022-02-16 10:39:26.73|
|N2 |V2 |some str1|2 |2022-02-16 10:39:26.73|
|N3 |V2 |some str1|2 |2022-02-16 10:39:26.73|
|N1 |V2 |some str1|2 |2022-02-16 10:39:26.73|
|N3 |V1 |some str1|2 |2022-02-16 10:39:26.73|
|N2 |V3 |some str1|2 |2022-02-16 10:39:26.73|
--- --- --------- ---- ----------------------