Home > Enterprise >  Select columns from a highly nested data
Select columns from a highly nested data


For the dataframe below, which was generated from an avro file, I'm trying to get the column names as a list or other format so that I can use it in a select statement. node1 and node2 have the same elements. For example I understand that we could do df.select(col('data.node1.name')), but I'm not sure

  1. how to select all columns at once without hardcode all the column names, and
  2. how to handle the nested part. I think to make it readable, the productvalues and porders should be selected into separate individual dataframes/tables?

Input schema:

  |-- metadata: struct
  |-- data :struct 
  |    |--node1 : struct
  |    |   |--name : string
  |    |   |--productlist: array
  |    |        |--element : struct
       |              |--productvalues: array
       |                   |--element : struct
       |                         |-- pname:string
       |                         |-- porders:array
       |                                |--element : struct
       |                                      |-- ordernum: int
       |                                      |-- field: string
       |--node2 : struct
  |        |--name : string
  |        |--productlist: array
  |             |--element : struct
                      |--productvalues: array
                          |--element : struct
                                 |-- pname:string
                                 |-- porders:array
                                        |--element : struct
                                              |-- ordernum: int
                                              |-- field: string

CodePudding user response:

Instead of gathering all data in one table I would recommend you making more tables for each list . To get values from list you can use the "explode" function.

for example for making productlist table

productlist = df.select(col('data.node1.name').alias("name") , explode(col('data.node1.productlist'))).alias("first_explode"))

in the next step you can use the productlist df and do this

productValue = df.select(col('productlist.name'),col('productlist.node1.first_explode.element'),explode(col('productlist.node1.first_explode.productvalues')).alias("second_explode"))

and so on ..

you can also get some help from this link as well https://sparkbyexamples.com/pyspark/pyspark-explode-array-and-map-columns-to-rows/

CodePudding user response:

So, this is not a perfect answer for you, but I hope it might give you some ideas to solve your problem. I know you said you don't want to hardcode your column names but I'm unable to handle that part at this moment.

First thing first, I created this sample JSON for testing

    "metadata": {},
    "data": {
        "node1": {
            "name": "Node001",
            "productlist": [
                    "productvalues": [
                            "pname": "Node001-P001",
                            "porders": [
                                {"ordernum":  1, "field": "Node001-P001-001"},
                                {"ordernum":  2, "field": "Node001-P001-002"}
                            "pname": "Node001-P002",
                            "porders": [
                                {"ordernum":  3, "field": "Node001-P002-003"},
                                {"ordernum":  4, "field": "Node001-P002-004"},
                                {"ordernum":  5, "field": "Node001-P002-005"},
                                {"ordernum":  6, "field": "Node001-P002-006"}
                            "pname": "Node001-P003",
                            "porders": [
                                {"ordernum":  7, "field": "Node001-P003-007"}
                    "productvalues": [
                            "pname": "Node001-P004",
                            "porders": [
                                {"ordernum":  8, "field": "Node001-P004-008"},
                                {"ordernum":  9, "field": "Node001-P004-009"},
                                {"ordernum": 10, "field": "Node001-P004-010"}
                            "pname": "Node001-P005",
                            "porders": [
                                {"ordernum": 11, "field": "Node001-P005-011"},
                                {"ordernum": 12, "field": "Node001-P005-012"},
                                {"ordernum": 13, "field": "Node001-P005-013"}
        "node2": {
            "name": "Node002",
            "productlist": [
                    "productvalues": [
                            "pname": "Node002-P001",
                            "porders": [
                                {"ordernum": 14, "field": "Node002-P001-014"}
                            "pname": "Node002-P002",
                            "porders": [
                                {"ordernum": 15, "field": "Node002-P002-015"}
                            "pname": "Node002-P003",
                            "porders": [
                                {"ordernum": 16, "field": "Node002-P003-016"}
                    "productvalues": [
                            "pname": "Node002-P004",
                            "porders": [
                                {"ordernum": 17, "field": "Node002-P004-017"}
                            "pname": "Node002-P005",
                            "porders": [
                                {"ordernum": 18, "field": "Node002-P005-018"}

Now, this is a "dict-like" column that you need to use for later

cols_dict = [
        'col': ['data.node1.name'],
        'exp': ['data.node1.productlist'],
        'exp': ['productlist.productvalues'],
        'col': ['productvalues.pname'],
        'exp': ['productvalues.porders'],
        'col': ['porders.ordernum', 'porders.field']

And finally, loop through this dict and add some transformation to get your final result

dfx = df
select_col = []
for i in cols_dict:
    select_col = [c.split('.')[-1] for c in select_col]
    if i.get('col'):
        select_col  = i['col']
    select_exp = []
    if i.get('exp'):
        select_exp  = i['exp']

    dfx = dfx.select([F.col(c) for c in select_col]   [F.explode(c).alias(c.split('.')[-1]) for c in select_exp])

 ------- ------------ -------- ---------------- 
|   name|       pname|ordernum|           field|
 ------- ------------ -------- ---------------- 
|Node001|Node001-P001|       1|Node001-P001-001|
|Node001|Node001-P001|       2|Node001-P001-002|
|Node001|Node001-P002|       3|Node001-P002-003|
|Node001|Node001-P002|       4|Node001-P002-004|
|Node001|Node001-P002|       5|Node001-P002-005|
|Node001|Node001-P002|       6|Node001-P002-006|
|Node001|Node001-P003|       7|Node001-P003-007|
|Node001|Node001-P004|       8|Node001-P004-008|
|Node001|Node001-P004|       9|Node001-P004-009|
|Node001|Node001-P004|      10|Node001-P004-010|
|Node001|Node001-P005|      11|Node001-P005-011|
|Node001|Node001-P005|      12|Node001-P005-012|
|Node001|Node001-P005|      13|Node001-P005-013|
 ------- ------------ -------- ---------------- 

CodePudding user response:

The following way, you will not need to hardcode all the struct fields. But you will need to provide a list of those columns/fields which have the type of array of struct. You have 3 of such fields, we will add one more column, so in total it will be 4.

First of all, the dataframe, similar to yours:

from pyspark.sql import functions as F

df = spark.createDataFrame(
        ('a', 'b'),
                                (1111, 'field_1111'),
                                (1112, 'field_1112')
                                (1121, 'field_1121'),
                                (1122, 'field_1122')
                                (1211, 'field_1211'),
                                (1212, 'field_1212')
                                (1221, 'field_1221'),
                                (1222, 'field_1222')
                                (2111, 'field_2111'),
                                (2112, 'field_2112')
                                (2121, 'field_2121'),
                                (2122, 'field_2122')
                                (2211, 'field_2211'),
                                (2212, 'field_2212')
                                (2221, 'field_2221'),
                                (2222, 'field_2222')
    'metadata:struct<fld1:string,fld2:string>, data:struct<node1:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>, node2:struct<name:string, productlist:array<struct<productvalues:array<struct<pname:string, porders:array<struct<ordernum:int, field:string>>>>>>>>'
# df.printSchema()
# root
#  |-- metadata: struct (nullable = true)
#  |    |-- fld1: string (nullable = true)
#  |    |-- fld2: string (nullable = true)
#  |-- data: struct (nullable = true)
#  |    |-- node1: struct (nullable = true)
#  |    |    |-- name: string (nullable = true)
#  |    |    |-- productlist: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- productvalues: array (nullable = true)
#  |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |-- pname: string (nullable = true)
#  |    |    |    |    |    |    |-- porders: array (nullable = true)
#  |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |    |    |-- ordernum: integer (nullable = true)
#  |    |    |    |    |    |    |    |    |-- field: string (nullable = true)
#  |    |-- node2: struct (nullable = true)
#  |    |    |-- name: string (nullable = true)
#  |    |    |-- productlist: array (nullable = true)
#  |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |-- productvalues: array (nullable = true)
#  |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |-- pname: string (nullable = true)
#  |    |    |    |    |    |    |-- porders: array (nullable = true)
#  |    |    |    |    |    |    |    |-- element: struct (containsNull = true)
#  |    |    |    |    |    |    |    |    |-- ordernum: integer (nullable = true)
#  |    |    |    |    |    |    |    |    |-- field: string (nullable = true)

The answer

  • Spark 3.1

    nodes = df.select("data.*").columns
    for n in nodes:
        df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))
    df = df.withColumn("data", F.array("data.*"))
    for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
        df = df.select(
            *[c for c in df.columns if c != arr_of_struct],
  • Lower Spark versions:

    nodes = df.select("data.*").columns
    for n in nodes:
        df = df.withColumn(
                F.struct(F.lit(n).alias("node"), f"data.{n}.*").alias(n),
                *[f"data.{c}" for c in df.select("data.*").columns if c != n]
    df = df.withColumn("data", F.array("data.*"))
    for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
        df = df.select(
            *[c for c in df.columns if c != arr_of_struct],


# root
#  |-- metadata: struct (nullable = true)
#  |    |-- fld1: string (nullable = true)
#  |    |-- fld2: string (nullable = true)
#  |-- node: string (nullable = false)
#  |-- name: string (nullable = true)
#  |-- pname: string (nullable = true)
#  |-- ordernum: integer (nullable = true)
#  |-- field: string (nullable = true)

#  -------- ----- ------ --------- -------- ---------- 
# |metadata| node|  name|    pname|ordernum|     field|
#  -------- ----- ------ --------- -------- ---------- 
# |  {a, b}|node1|name_1|pname_111|    1111|field_1111|
# |  {a, b}|node1|name_1|pname_111|    1112|field_1112|
# |  {a, b}|node1|name_1|pname_112|    1121|field_1121|
# |  {a, b}|node1|name_1|pname_112|    1122|field_1122|
# |  {a, b}|node1|name_1|pname_121|    1211|field_1211|
# |  {a, b}|node1|name_1|pname_121|    1212|field_1212|
# |  {a, b}|node1|name_1|pname_122|    1221|field_1221|
# |  {a, b}|node1|name_1|pname_122|    1222|field_1222|
# |  {a, b}|node2|name_2|pname_211|    2111|field_2111|
# |  {a, b}|node2|name_2|pname_211|    2112|field_2112|
# |  {a, b}|node2|name_2|pname_212|    2121|field_2121|
# |  {a, b}|node2|name_2|pname_212|    2122|field_2122|
# |  {a, b}|node2|name_2|pname_221|    2211|field_2211|
# |  {a, b}|node2|name_2|pname_221|    2212|field_2212|
# |  {a, b}|node2|name_2|pname_222|    2221|field_2221|
# |  {a, b}|node2|name_2|pname_222|    2222|field_2222|
#  -------- ----- ------ --------- -------- ---------- 


nodes = df.select("data.*").columns
for n in nodes:
    df = df.withColumn("data", F.col("data").withField(n, F.struct(F.lit(n).alias("node"), f"data.{n}.*")))

Using the above, I decided to save the node title in case you need it. It first gets a list of nodes from "data" column fields. Using the list, the for loop creates one more field inside every node struct for the title of the node.

df = df.withColumn("data", F.array("data.*"))

The above converts the "data" column type from struct to array so that in the next step we could easily explode it into columns.

for arr_of_struct in ["data", "productlist", "productvalues", "porders"]:
    df = df.select(
        *[c for c in df.columns if c != arr_of_struct],

In the above, the main line is F.expr(f"inline({arr_of_struct})"). It must be used inside a loop, because it's a generator and you cannot nest them together in Spark. inline explodes arrays of structs into columns. At this step you have 4 of [array of struct], so 4 inline expressions will be created.

  • Related