Home > Net >  Generate a nested nested structure in pyspark
Generate a nested nested structure in pyspark

Time:06-13

I have the following DF:

 ---- ------ -------- ---- ---- ---- ---- ---------- 
| ID | Name |  Vl1   | Vl2| Vl3| Vl4|Vl5 | Vl6      |
 ---- ------ -------- ---- ---- ---- ---- ---------- 
|1   |John  |    1.5 |null|null|null|   A|2022-01-01|
|1   |John  |    1   |null|null|null|   A|2022-01-01|
|1   |John  |    3   |null|1   |null|   A|2022-01-01|
|1   |John  |    4   |null|1   |null|   A|2022-01-01|
|2   |Ana   |    2.5 |null|null|null|   A|2022-01-01|
|2   |Ana   |    0   |null|null|null|   A|2022-01-01|
|2   |Ana   |    null|null|null|null|   A|2022-01-01|
|2   |Ana   |    2   |null|null|null|   A|2022-01-01|
|2   |Ana   |    2   |2   |null|null|   A|2022-01-01|
|2   |Ana   |    1   |null|null|null|   A|2022-01-01|
|3   |Paul  |    5   |null|null|null|   A|2022-01-01|
|3   |Paul  |    null|2   |null|null|   A|2022-01-01|
|3   |Paul  |    2.5 |null|2   |null|   A|2022-01-01|
|3   |Paul  |    null|null|3   |null|   A|2022-01-01|
 ---- ------ -------- ---- ---- ---- ---- ---------- 

How can I generate the following nested structure:

 |-- Title: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Client: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- Vl1: double (nullable = true)
 |    |    |-- Vl2: double (nullable = true)
 |    |    |-- Prch: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- Vl3: string (nullable = true)
 |    |    |    |    |-- Detail: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |    |    |-- Vl4: date (nullable = true)
 |    |    |    |    |-- Bs: array (nullable = true)
 |    |    |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |    |    |-- Vl5: string (nullable = true)
 |    |    |    |    |    |    |-- Vl6: date (nullable = true)

Until now, I used withcolumn to generate the columnns Tiltle and Company. Then I used groupby to group by clients and then I used collect_list to generate the first level of the nested structure, but how can I generate the other levels of the strucuture (Prch, Detail, Bs).

Just to know, my DF has 1 billion lines. I would like to know the best way to generate this structure. My spark environment has 4 workers with 5 cores by worker.

MVCE:

  data = [
 ("1","John",1.5,None,None,None,"A", "2022-01-01"),
 ("1","John",1.0,None,None,None,"A", "2022-01-01"),
 ("1","John",3.0,None,1.0,None,"A", "2022-01-01"),
 ("1","John",4.0,None,1.0,None,"A", "2022-01-01"),
 ("2","Ana",2.5,None,None,None,"A", "2022-01-01"),
 ("2","Ana",0.0,None,None,None,"A", "2022-01-01"),
 ("2","Ana",None,None,None,None,"A", "2022-01-01"),
 ("2","Ana",2.0,None,None,None,"A", "2022-01-01"),
 ("2","Ana",2.0,2.0,None,None,"A", "2022-01-01"),
 ("2","Ana",1.0,None,None,None,"A", "2022-01-01"),
 ("3","Paul",5.0,None,None,None,"A", "2022-01-01"),
 ("3","Paul",None,2.0,None,None,"A", "2022-01-01"),
 ("3","Paul",2.5,None,2.0,None,"A", "2022-01-01"),
 ("3","Paul",None,None,3.0,None,"A", "2022-01-01")
 ]

schema = StructType([
    StructField("Id", StringType(),True),
    StructField("Name", StringType(),True),
    StructField("Vl1", DoubleType(),True),
    StructField("Vl2", DoubleType(), True),
    StructField("Vl3", DoubleType(), True),
    StructField("Vl4", DateType(), True),
    StructField("Vl5", StringType(), True),
    StructField("Vl6", StringType(), True)
  ])

df = spark.createDataFrame(data=data,schema=schema)
df.printSchema()
df.show()

CodePudding user response:

You have to group them sequentially, for each level of nesting from the innermost and move outwards. The first line below is to create the Vl3, Details, Bs, then the second line is to create the Prch column, and 3rd is to create the Client column. (You ob don't have to create multiple data frames for each step, this is just for the sake of explanation.)

df1 = df.groupBy("Id", "Name", "Vl1", "Vl2", "Vl3").agg(fn.collect_list(fn.struct(fn.col("Vl4"))).alias("Detail"), fn.collect_list(fn.struct(fn.col("Vl5"), fn.col("Vl6"))).alias("Bs"))
df2 = df1.groupBy("Id", "Name", "Vl1", "Vl2").agg(fn.collect_list(fn.struct(fn.col("Vl3"), fn.col("Detail"), fn.col("Bs"))).alias("Prch"))
df3 = df2.groupBy("Id", "Name").agg(fn.collect_list(fn.struct(fn.col("Vl1"), fn.col("Vl2"), fn.col("Prch"))).alias("Client"))

Which gives you this schema:

root
 |-- Id: string (nullable = true)
 |-- Name: string (nullable = true)
 |-- Client: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- Vl1: double (nullable = true)
 |    |    |-- Vl2: double (nullable = true)
 |    |    |-- Prch: array (nullable = false)
 |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |-- Vl3: double (nullable = true)
 |    |    |    |    |-- Detail: array (nullable = false)
 |    |    |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |    |    |-- Vl4: date (nullable = true)
 |    |    |    |    |-- Bs: array (nullable = false)
 |    |    |    |    |    |-- element: struct (containsNull = false)
 |    |    |    |    |    |    |-- Vl5: string (nullable = true)
 |    |    |    |    |    |    |-- Vl6: string (nullable = true)
  • Related