Home > Blockchain >  Creating MAPTYPE field from multiple columns - Spark SQL
Creating MAPTYPE field from multiple columns - Spark SQL

Time:11-03

I have a use case wherein multiple keys are distributed across the dataset in a JSON format, which needs to be aggregated into a consolidated resultset for further processing.

I have been able to develop a code structure that achieves it using both Python API (PySpark) & Spark SQL, but the latter involves a more composite & tardy of doing it and involves intermediate conversations which can in the future lead to errors.

Using the below snippets, is there a better way to achieve this using Spark SQL, by creating a MAP<STRING,ARRAY<STRING> using key and value?

Data Preparation

from pyspark.sql.types import *
import pandas as pd
from io import StringIO

s = StringIO("""
id|json_struct
1|{"a":["tyeqb","",""],"e":["qwrqc","",""]}
1|{"t":["sartq","",""],"r":["fsafsq","",""]}
1|{"b":["puhqiqh","",""],"e":["hjfsaj","",""]}
2|{"b":["basajhjwa","",""],"e":["asfafas","",""]}
2|{"n":["gaswq","",""],"r":["sar","",""],"l":["sar","",""],"s":["rqqrq","",""],"m":["wrqwrq","",""]}
2|{"s":["tqqwjh","",""],"t":["afs","",""],"l":["fsaafs","",""]}
""")
df = pd.read_csv(s,delimiter='|')

sparkDF = spark.createDataFrame(df)

sparkDF.registerTempTable("INPUT")

sparkDF = sparkDF.withColumn('json_struct', F.from_json(F.col('json_struct')
                                         ,schema=MapType(StringType(),ArrayType(StringType()),True)
                                    ))

sparkDF.show(truncate=False)

 --- --------------------------------------------------------------------------------------- 
|id |json_struct                                                                            |
 --- --------------------------------------------------------------------------------------- 
|1  |{a -> [tyeqb, , ], e -> [qwrqc, , ]}                                                   |
|1  |{t -> [sartq, , ], r -> [fsafsq, , ]}                                                  |
|1  |{b -> [puhqiqh, , ], e -> [hjfsaj, , ]}                                                |
|2  |{b -> [basajhjwa, , ], e -> [asfafas, , ]}                                             |
|2  |{n -> [gaswq, , ], r -> [sar, , ], l -> [sar, , ], s -> [rqqrq, , ], m -> [wrqwrq, , ]}|
|2  |{s -> [tqqwjh, , ], t -> [afs, , ], l -> [fsaafs, , ]}                                 |
 --- --------------------------------------------------------------------------------------- 

Python API (PySpark) - Implementation

As you can see, the resultant key from explode is natively a STRING type and since PySpark has create_map, which is not available within Spark SQL, it can be readily used to generate the final json_struct column ensuring a single key with a varying length ARRAYTYPE<STRING> value

sparkDF.select(
               F.col('id')
              ,F.explode(F.col('json_struct'))
        ).withColumn('value',F.filter(F.col('value'), lambda x: x != '')\
        ).withColumn('value',F.concat_ws(',', F.col('value'))\
        ).groupBy('id', 'key'
        ).agg(F.collect_set(F.col('value')).alias('value')\
        ).withColumn('json_struct',F.to_json(F.create_map("key","value"))
        ).orderBy('id'
        ).show(truncate=False)

 --- --- --------------- ------------------------ 
|id |key|value          |json_struct             |
 --- --- --------------- ------------------------ 
|1  |a  |[tyeqb]        |{"a":["tyeqb"]}         |
|1  |e  |[hjfsaj, qwrqc]|{"e":["hjfsaj","qwrqc"]}|
|1  |r  |[fsafsq]       |{"r":["fsafsq"]}        |
|1  |b  |[puhqiqh]      |{"b":["puhqiqh"]}       |
|1  |t  |[sartq]        |{"t":["sartq"]}         |
|2  |b  |[basajhjwa]    |{"b":["basajhjwa"]}     |
|2  |n  |[gaswq]        |{"n":["gaswq"]}         |
|2  |t  |[afs]          |{"t":["afs"]}           |
|2  |s  |[tqqwjh, rqqrq]|{"s":["tqqwjh","rqqrq"]}|
|2  |e  |[asfafas]      |{"e":["asfafas"]}       |
|2  |l  |[sar, fsaafs]  |{"l":["sar","fsaafs"]}  |
|2  |r  |[sar]          |{"r":["sar"]}           |
|2  |m  |[wrqwrq]       |{"m":["wrqwrq"]}        |
 --- --- --------------- ------------------------ 

Spark SQL - Implementation

Within this implementation, I have to take additional steps to ensure both key and value columns are of ARRAYTYPE and consistent lengths as map_from_arrays takes in arrays as inputs.

Is there a way to bypass these and create a similar schema as depicted using Python API?

sql.sql("""
        SELECT
            id,
            KEY,
            VALUE,
            TO_JSON(MAP_FROM_ARRAYS(KEY,VALUE)) as json_struct
        FROM (
            SELECT
                id,
                key,
                ARRAY(COLLECT_SET( value )) as value -- <------- ### Ensuring Value is NESTED ARRAY
            FROM (
                SELECT
                    id,
                    SPLIT(k,'|',1) as key,   -- <------- ### Ensuring Key is Array
                    CONCAT_WS(',',FILTER(v,x -> x != '')) as value
                FROM (
                        SELECT 
                            id,
                            EXPLODE(FROM_JSON(json_struct,'MAP<STRING,ARRAY<STRING>>')) as (k,v)
                        FROM INPUT
                    )
            )
            GROUP BY 1,2
        )
    ORDER BY 1
""").show(truncate=False)

 --- --- ----------------- ------------------------ 
|id |KEY|VALUE            |json_struct             |
 --- --- ----------------- ------------------------ 
|1  |[a]|[[tyeqb]]        |{"a":["tyeqb"]}         |
|1  |[e]|[[hjfsaj, qwrqc]]|{"e":["hjfsaj","qwrqc"]}|
|1  |[b]|[[puhqiqh]]      |{"b":["puhqiqh"]}       |
|1  |[r]|[[fsafsq]]       |{"r":["fsafsq"]}        |
|1  |[t]|[[sartq]]        |{"t":["sartq"]}         |
|2  |[n]|[[gaswq]]        |{"n":["gaswq"]}         |
|2  |[b]|[[basajhjwa]]    |{"b":["basajhjwa"]}     |
|2  |[t]|[[afs]]          |{"t":["afs"]}           |
|2  |[s]|[[tqqwjh, rqqrq]]|{"s":["tqqwjh","rqqrq"]}|
|2  |[e]|[[asfafas]]      |{"e":["asfafas"]}       |
|2  |[l]|[[sar, fsaafs]]  |{"l":["sar","fsaafs"]}  |
|2  |[r]|[[sar]]          |{"r":["sar"]}           |
|2  |[m]|[[wrqwrq]]       |{"m":["wrqwrq"]}        |
 --- --- ----------------- ------------------------ 

CodePudding user response:

Spark SQL instead of create_map has map. Your PySpark code could be translated into this:

df = spark.sql("""
    WITH
    TBL2 (SELECT id, EXPLODE(FROM_JSON(json_struct,'MAP<STRING,ARRAY<STRING>>')) from INPUT),
    TBL3 (SELECT id, key, FLATTEN(COLLECT_SET(FILTER(value, x -> x != ''))) value
          FROM TBL2
          GROUP BY id, key)
    SELECT *, TO_JSON(MAP(key, value)) json_struct
    FROM TBL3
""")

df.show(truncate=0)
#  --- --- --------------- ------------------------ 
# |id |key|value          |json_struct             |
#  --- --- --------------- ------------------------ 
# |1  |a  |[tyeqb]        |{"a":["tyeqb"]}         |
# |1  |e  |[qwrqc, hjfsaj]|{"e":["qwrqc","hjfsaj"]}|
# |1  |b  |[puhqiqh]      |{"b":["puhqiqh"]}       |
# |1  |r  |[fsafsq]       |{"r":["fsafsq"]}        |
# |1  |t  |[sartq]        |{"t":["sartq"]}         |
# |2  |b  |[basajhjwa]    |{"b":["basajhjwa"]}     |
# |2  |n  |[gaswq]        |{"n":["gaswq"]}         |
# |2  |s  |[rqqrq, tqqwjh]|{"s":["rqqrq","tqqwjh"]}|
# |2  |t  |[afs]          |{"t":["afs"]}           |
# |2  |e  |[asfafas]      |{"e":["asfafas"]}       |
# |2  |l  |[fsaafs, sar]  |{"l":["fsaafs","sar"]}  |
# |2  |r  |[sar]          |{"r":["sar"]}           |
# |2  |m  |[wrqwrq]       |{"m":["wrqwrq"]}        |
#  --- --- --------------- ------------------------ 
  • Related