Home > Software design >  How to parse xml string column with pyspark
How to parse xml string column with pyspark

Time:03-03

I am trying to parse multiple xml files with pyspark. All xml files have the same known schema.
First I load all the files as text to spark DF:

path = 'c://path/to/xml/files/*.xml
df = spark.read.text(path)

At this point my DF looks like this:

 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
| value                                                                                     
 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|<Msg><Header><tag1>some str1</tag1><tag2>2</tag2><tag3>2022-02-16 10:39:26.730</tag3></Header><Body><Pair><N>N1</N><V>V1</V></Pair><Pair><N>N2</N><V>V2</V></Pair><Pair><N>N3</N><V>V3</V></Pair></Body></Msg>|
|<Msg><Header><tag1>some str2</tag1><tag2>5</tag2><tag3>2022-02-17 10:39:26.730</tag3></Header><Body><Pair><N>N4</N><V>V4</V></Pair><Pair><N>N5</N><V>V5</V></Pair></Body></Msg>|
 -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- 

xml file schema is as follwoing:

df.printSchema()
root
 |-- Header: struct (nullable = false)
 |    |-- tag1: string (nullable = false)
 |    |-- tag2: integer (nullable = false)
 |    |-- tag3: timestamp (nullable = false)
 |-- Body: struct (nullable = false)
 |    |-- Pair: array (nullable = false)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- N: string (nullable = false)
 |    |    |    |-- V: string (nullable = false)

So the final output after parsing should look like this:

 --------- ----- ------------------------ --- -- 
|tag1     | tag2| tag3                   | N |V |
 --------- ----- ------------------------ --- -- 
|some str1| 2   |2022-02-16 10:39:26.730 |N1 |V1|
|some str1| 2   |2022-02-16 10:39:26.730 |N2 |V2|
|some str1| 2   |2022-02-16 10:39:26.730 |N3 |V3|
|some str2| 5   |2022-02-17 10:39:26.730 |N4 |V4|
|some str2| 5   |2022-02-17 10:39:26.730 |N5 |V5|
 --------- ----- ------------------------ --- -- 

Meaning "Header" element should repeat itself for all NV pairs that come from the same xml string.
So I think I found a way to extract all the header tags with xpath or xml.etree.ElementTree but my problem is that I don't really understand how to extract my NV pairs to something that I can explode later.

What am I missing?

CodePudding user response:

Give a try to:

from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

df = spark.read.format('xml').options(rowTag='book').load('books.xml')

See https://github.com/databricks/spark-xml#python-api

CodePudding user response:

Depending on your spark version, you have to add this to the environment. I am using spark 2.4.0, and this version worked for me. databricks xml version

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.databricks:spark-xml_2.11:0.7.0 pyspark-shell'

The input_path.xml looks like the following:

<Msg><Header><tag1>some str1</tag1><tag2>2</tag2><tag3>2022-02-16 10:39:26.730</tag3></Header><Body><Pair><N>N1</N><V>V1</V></Pair><Pair><N>N2</N><V>V2</V></Pair><Pair><N>N3</N><V>V3</V></Pair></Body></Msg>
<Msg><Header><tag1>some str2</tag1><tag2>5</tag2><tag3>2022-02-17 10:39:26.730</tag3></Header><Body><Pair><N>N4</N><V>V4</V></Pair><Pair><N>N5</N><V>V5</V></Pair></Body></Msg>
input_path = 'src/input/input.xml'

xmlDF = spark.read.format('xml').option('rowTag', 'Msg').load(input_path)

xmlDF.printSchema()
root
 |-- Body: struct (nullable = true)
 |    |-- Pair: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- N: string (nullable = true)
 |    |    |    |-- V: string (nullable = true)
 |-- Header: struct (nullable = true)
 |    |-- tag1: string (nullable = true)
 |    |-- tag2: long (nullable = true)
 |    |-- tag3: timestamp (nullable = true)

Since you can't explode 2 lists in the same query, you can divide it that way:

xmlDF.select(
        '*',
        explode("Body.Pair.N").alias('N')
    ).select(
        'N',
        explode("Body.Pair.V").alias('V'),
        col("Header.tag1").alias('tag1'),
        col("Header.tag2").alias('tag2'),
        col("Header.tag3").alias('tag3'),
    ) \
        .dropDuplicates() \
        .show(truncate=False)

It will give the following result based on your input:

 --- --- --------- ---- ---------------------- 
|N  |V  |tag1     |tag2|tag3                  |
 --- --- --------- ---- ---------------------- 
|N2 |V1 |some str1|2   |2022-02-16 10:39:26.73|
|N4 |V5 |some str2|5   |2022-02-17 10:39:26.73|
|N1 |V3 |some str1|2   |2022-02-16 10:39:26.73|
|N5 |V5 |some str2|5   |2022-02-17 10:39:26.73|
|N5 |V4 |some str2|5   |2022-02-17 10:39:26.73|
|N4 |V4 |some str2|5   |2022-02-17 10:39:26.73|
|N1 |V1 |some str1|2   |2022-02-16 10:39:26.73|
|N3 |V3 |some str1|2   |2022-02-16 10:39:26.73|
|N2 |V2 |some str1|2   |2022-02-16 10:39:26.73|
|N3 |V2 |some str1|2   |2022-02-16 10:39:26.73|
|N1 |V2 |some str1|2   |2022-02-16 10:39:26.73|
|N3 |V1 |some str1|2   |2022-02-16 10:39:26.73|
|N2 |V3 |some str1|2   |2022-02-16 10:39:26.73|
 --- --- --------- ---- ---------------------- 
  • Related