I have the following dataframe
root
|-- AUTHOR_ID: integer (nullable = false)
|-- Books: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- DATE: TimestampType (nullable = true)
How to find the oldest published book for each author ? I want to retrieve the date
{
"AUTHOR_ID": 1,
"FIRST_PUBLICATION": <Date>
"Books": "[ ... ]"
}
CodePudding user response:
Many ways of doing, Lets Try window functions
root
|-- AUTHOR_ID: integer (nullable = false)
|-- Books: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- DATE: date (nullable = true)
--------- --------------------------------
|AUTHOR_ID|Books |
--------- --------------------------------
|21 |[{Stories of Mary, 2019-12-01}] |
|34 |[{Sorrows of Mary, 2019-09-01}] |
|34 |[{Sparrows of Mary, 2019-06-16}]|
|21 |[{Songs of Mary, 2017-03-14}] |
--------- --------------------------------
Following you Edits
win=Window.partitionBy('AUTHOR_ID').orderBy(F.asc('Books.Date'))
df1=(
df.withColumn("rank", row_number().over(win)==1).where(col('rank')==1).drop('rank')#Filter by oldest date
.withColumn('value', to_json(F.struct(col('AUTHOR_ID'),col('Books.Date').alias('FIRST_PUBLICATION'),'Books')))#Create json column
).select('value').show(truncate=False)
-------------------------------------------------------------------------------------------------------------
|value |
-------------------------------------------------------------------------------------------------------------
|{"AUTHOR_ID":21,"FIRST_PUBLICATION":["2017-03-14"],"Books":[{"NAME":"Songs of Mary","DATE":"2017-03-14"}]} |
|{"AUTHOR_ID":34,"FIRST_PUBLICATION":["2019-06-16"],"Books":[{"NAME":"Sparrows of Mary","DATE":"2019-06-16"}]}|
-------------------------------------------------------------------------------------------------------------
CodePudding user response:
For Spark v3
using Spark Higher-order functions
is the best solution,
df = spark.createDataFrame([("1", [Row(NAME="xs", DATE=datetime.strptime('2022-04-06 00:00:00', '%Y-%m-%d %H:%M:%S')),
Row(NAME="s", DATE=datetime.strptime('2022-04-05 00:00:00', '%Y-%m-%d %H:%M:%S')),]), ],
'struct<AUTHOR_ID:string,Books:array<struct<NAME:string,DATE:timestamp>>>')
df.show(truncate=False)
--------- -----------------------------------------------------
|AUTHOR_ID|Books |
--------- -----------------------------------------------------
|1 |[{xs, 2022-04-06 00:00:00}, {s, 2022-04-05 00:00:00}]|
--------- -----------------------------------------------------
df.printSchema()
root
|-- AUTHOR_ID: string (nullable = true)
|-- Books: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- DATE: timestamp (nullable = true)
We can get the book with the least date for each author as the following
df = df.withColumn('FIRST_PUBLICATION',
f.aggregate(
'Books',
f.lit(datetime.strptime('2222-02-22 22:22:22', '%Y-%m-%d %H:%M:%S')),
lambda acc, b : f.least(acc, b['DATE'])
)
)
Result
# df.show()
--------- -------------------- -------------------
|AUTHOR_ID| Books| FIRST_PUBLICATION|
--------- -------------------- -------------------
| 1|[{xs, 2022-04-06 ...|2022-04-05 00:00:00|
--------- -------------------- -------------------
CodePudding user response:
Since Spark 2.4, you can use the array_min
function to retrieve the minimum element of an array. You apply this function to an array that contains only the dates. To build the array that contains only dates, you can use getField
method on Books
column.
Here is the complete code:
from pyspark.sql import functions as F
df = df.withColumn('FIRST_PUBLICATION', F.array_min(F.col('Books').getField('DATE')))