Home > Enterprise >  Pyspark - find the oldest date in a nested collection
Pyspark - find the oldest date in a nested collection

Time:04-07

I have the following dataframe

 root
     |-- AUTHOR_ID: integer (nullable = false)
     |-- Books: array (nullable = true) 
     |    |-- element: struct (containsNull = true)
     |    |    |-- NAME: string (nullable = true)
     |    |    |-- DATE: TimestampType (nullable = true)

How to find the oldest published book for each author ? I want to retrieve the date

{
 "AUTHOR_ID": 1,
 "FIRST_PUBLICATION": <Date>
 "Books": "[ ... ]"
}

CodePudding user response:

Many ways of doing, Lets Try window functions

root
 |-- AUTHOR_ID: integer (nullable = false)
 |-- Books: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- DATE: date (nullable = true)

 --------- -------------------------------- 
|AUTHOR_ID|Books                           |
 --------- -------------------------------- 
|21       |[{Stories of Mary, 2019-12-01}] |
|34       |[{Sorrows of Mary, 2019-09-01}] |
|34       |[{Sparrows of Mary, 2019-06-16}]|
|21       |[{Songs of Mary, 2017-03-14}]   |
 --------- -------------------------------- 

Following you Edits

win=Window.partitionBy('AUTHOR_ID').orderBy(F.asc('Books.Date'))
df1=(
  
     df.withColumn("rank", row_number().over(win)==1).where(col('rank')==1).drop('rank')#Filter by oldest date
     
     .withColumn('value', to_json(F.struct(col('AUTHOR_ID'),col('Books.Date').alias('FIRST_PUBLICATION'),'Books')))#Create json column
     
    ).select('value').show(truncate=False)


 ------------------------------------------------------------------------------------------------------------- 
|value                                                                                                        |
 ------------------------------------------------------------------------------------------------------------- 
|{"AUTHOR_ID":21,"FIRST_PUBLICATION":["2017-03-14"],"Books":[{"NAME":"Songs of Mary","DATE":"2017-03-14"}]}   |
|{"AUTHOR_ID":34,"FIRST_PUBLICATION":["2019-06-16"],"Books":[{"NAME":"Sparrows of Mary","DATE":"2019-06-16"}]}|
 ------------------------------------------------------------------------------------------------------------- 

CodePudding user response:

For Spark v3 using Spark Higher-order functions is the best solution,

df = spark.createDataFrame([("1", [Row(NAME="xs", DATE=datetime.strptime('2022-04-06 00:00:00', '%Y-%m-%d %H:%M:%S')),
                                       Row(NAME="s", DATE=datetime.strptime('2022-04-05 00:00:00', '%Y-%m-%d %H:%M:%S')),]), ],
                               'struct<AUTHOR_ID:string,Books:array<struct<NAME:string,DATE:timestamp>>>')

df.show(truncate=False)

 --------- ----------------------------------------------------- 
|AUTHOR_ID|Books                                                |
 --------- ----------------------------------------------------- 
|1        |[{xs, 2022-04-06 00:00:00}, {s, 2022-04-05 00:00:00}]|
 --------- ----------------------------------------------------- 

df.printSchema()

root
 |-- AUTHOR_ID: string (nullable = true)
 |-- Books: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- DATE: timestamp (nullable = true)

We can get the book with the least date for each author as the following

    df = df.withColumn('FIRST_PUBLICATION',
                  f.aggregate(
                      'Books',
                      f.lit(datetime.strptime('2222-02-22 22:22:22', '%Y-%m-%d %H:%M:%S')),
                      lambda acc, b : f.least(acc, b['DATE'])
                  )
           )

Result

# df.show()
 --------- -------------------- ------------------- 
|AUTHOR_ID|               Books|  FIRST_PUBLICATION|
 --------- -------------------- ------------------- 
|        1|[{xs, 2022-04-06 ...|2022-04-05 00:00:00|
 --------- -------------------- ------------------- 

CodePudding user response:

Since Spark 2.4, you can use the array_min function to retrieve the minimum element of an array. You apply this function to an array that contains only the dates. To build the array that contains only dates, you can use getField method on Books column.

Here is the complete code:

from pyspark.sql import functions as F

df = df.withColumn('FIRST_PUBLICATION', F.array_min(F.col('Books').getField('DATE')))
  • Related