Home > Net >  Pyspark - caclulate the most old date in a nested collection
Pyspark - caclulate the most old date in a nested collection

Time:04-06

I hva the following dataframe

 root
     |-- AUTHOR_ID: integer (nullable = false)
     |-- Books: array (nullable = true) 
     |    |-- element: struct (containsNull = true)
     |    |    |-- NAME: string (nullable = true)
     |    |    |-- DATE: TimestampType (nullable = true)

How to calculate the old published book for each author ? want to retrieve the date

{
 "AUTHOR_ID": 1,
 "FIRST_PUBLICATION": <Date>
 "Books": "[ ... ]"
}

CodePudding user response:

Many ways of doing, Lets Try window functions

root
 |-- AUTHOR_ID: integer (nullable = false)
 |-- Books: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- DATE: date (nullable = true)

 --------- -------------------------------- 
|AUTHOR_ID|Books                           |
 --------- -------------------------------- 
|21       |[{Stories of Mary, 2019-12-01}] |
|34       |[{Sorrows of Mary, 2019-09-01}] |
|34       |[{Sparrows of Mary, 2019-06-16}]|
|21       |[{Songs of Mary, 2017-03-14}]   |
 --------- -------------------------------- 

Following you Edits

win=Window.partitionBy('AUTHOR_ID').orderBy(F.asc('Books.Date'))
df1=(
  
     df.withColumn("rank", row_number().over(win)==1).where(col('rank')==1).drop('rank')#Filter by oldest date
     
     .withColumn('value', to_json(F.struct(col('AUTHOR_ID'),col('Books.Date').alias('FIRST_PUBLICATION'),'Books')))#Create json column
     
    ).select('value').show(truncate=False)


 ------------------------------------------------------------------------------------------------------------- 
|value                                                                                                        |
 ------------------------------------------------------------------------------------------------------------- 
|{"AUTHOR_ID":21,"FIRST_PUBLICATION":["2017-03-14"],"Books":[{"NAME":"Songs of Mary","DATE":"2017-03-14"}]}   |
|{"AUTHOR_ID":34,"FIRST_PUBLICATION":["2019-06-16"],"Books":[{"NAME":"Sparrows of Mary","DATE":"2019-06-16"}]}|
 ------------------------------------------------------------------------------------------------------------- 

CodePudding user response:

For Spark v3 using Spark Higher-order functions is the best solution,

df = spark.createDataFrame([("1", [Row(NAME="xs", DATE=datetime.strptime('2022-04-06 00:00:00', '%Y-%m-%d %H:%M:%S')),
                                       Row(NAME="s", DATE=datetime.strptime('2022-04-05 00:00:00', '%Y-%m-%d %H:%M:%S')),]), ],
                               'struct<AUTHOR_ID:string,Books:array<struct<NAME:string,DATE:timestamp>>>')

df.show(truncate=False)

 --------- ----------------------------------------------------- 
|AUTHOR_ID|Books                                                |
 --------- ----------------------------------------------------- 
|1        |[{xs, 2022-04-06 00:00:00}, {s, 2022-04-05 00:00:00}]|
 --------- ----------------------------------------------------- 

df.printSchema()

root
 |-- AUTHOR_ID: string (nullable = true)
 |-- Books: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- DATE: timestamp (nullable = true)

We can get the book with the least date for each author as the following

    df = df.withColumn('FIRST_PUBLICATION',
                  f.aggregate(
                      'Books',
                      f.lit(datetime.strptime('2222-02-22 22:22:22', '%Y-%m-%d %H:%M:%S')),
                      lambda acc, b : f.least(acc, b['DATE'])
                  )
           )

Result

# df.show()
 --------- -------------------- ------------------- 
|AUTHOR_ID|               Books|  FIRST_PUBLICATION|
 --------- -------------------- ------------------- 
|        1|[{xs, 2022-04-06 ...|2022-04-05 00:00:00|
 --------- -------------------- ------------------- 
  • Related