Home > Blockchain >  How to get second highest value from a column pyspark?
How to get second highest value from a column pyspark?

Time:09-22

I have a PySpark DataFrame and I would like to get the second highest value of ORDERED_TIME (DateTime Field yyyy-mm-dd format) after a groupBy applied to 2 columns, namely CUSTOMER_ID and ADDRESS_ID.

A customer can have many orders associated with an address and I would like to get the second most recent order for a (customer,address) pair

My approach was to make a window and partition according to CUSTOMER_ID and ADDRESS_ID, sort by ORDERED_TIME

sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(col('ORDERED_TIME').desc())

df2 = df2.withColumn("second_recent_order", (df2.select("ORDERED_TIME").collect()[1]).over(sorted_order_times))

However, I get an error saying ValueError: 'over' is not in list

Could anyone suggest the right way to go about solving this problem?

Please let me know if any other information is needed

Sample Data

 ----------- ---------- ------------------- 
|CUSTOMER_ID|ADDRESS_ID|       ORDERED_TIME| 
 ----------- ---------- ------------------- 
|        100| 158932441|2021-01-02 13:35:57|
|        100| 158932441|2021-01-14 19:14:08|
|        100| 158932441|2021-01-03 13:33:52|
|        100| 158932441|2021-01-04 09:36:10|
|        101| 281838494|2020-05-07 13:35:57|
|        101| 281838494|2021-04-14 19:14:08|
 ----------- ---------- ------------------- 

Expected Output

 ----------- ---------- ------------------- ------------------- 
|CUSTOMER_ID|ADDRESS_ID|       ORDERED_TIME|second_recent_order
 ----------- ---------- ------------------- ------------------- 
|        100| 158932441|2021-01-02 13:35:57|2021-01-04 09:36:10
|        100| 158932441|2021-01-14 19:14:08|2021-01-04 09:36:10
|        100| 158932441|2021-01-03 13:33:52|2021-01-04 09:36:10
|        100| 158932441|2021-01-04 09:36:10|2021-01-04 09:36:10
|        101| 281838494|2020-05-07 13:35:57|2020-05-07 13:35:57
|        101| 281838494|2021-04-14 19:14:08|2020-05-07 13:35:57
 ----------- ---------- ------------------- -------------------

CodePudding user response:

One solution is to create a lookup table with the second most recent orders for all couples of CUSTOMER_ID and ADDRESS_ID, and then join it with the original dataframe.
I assume that your ORDERED_TIME column is already a timestamp type.

import pyspark.sql.functions as F
from pyspark.sql.window import Window

# define window
w = Window().partitionBy('CUSTOMER_ID', 'ADDRESS_ID').orderBy(F.desc('ORDERED_TIME'))

# create lookup table
second_highest = df \
  .withColumn('rank', F.dense_rank().over(w)) \
  .filter(F.col('rank') == 2) \
  .select('CUSTOMER_ID', 'ADDRESS_ID', 'ORDERED_TIME')

# join with original dataframe
df = df.join(second_highest, on=['CUSTOMER_ID', 'ADDRESS_ID'], how='left')

df.show()

 ----------- ---------- ------------------- ------------------- 
|CUSTOMER_ID|ADDRESS_ID|       ORDERED_TIME|       ORDERED_TIME|
 ----------- ---------- ------------------- ------------------- 
|        100| 158932441|2021-01-02 13:35:57|2021-01-04 09:36:10|
|        100| 158932441|2021-01-14 19:14:08|2021-01-04 09:36:10|
|        100| 158932441|2021-01-03 13:33:52|2021-01-04 09:36:10|
|        100| 158932441|2021-01-04 09:36:10|2021-01-04 09:36:10|
|        101| 281838494|2020-05-07 13:35:57|2020-05-07 13:35:57|
|        101| 281838494|2021-04-14 19:14:08|2020-05-07 13:35:57|
 ----------- ---------- ------------------- ------------------- 

Note: in your expected output you wrote 2021-04-14 19:14:08 for CUSTOMER_ID == 101, but it's actually 2020-05-07 13:35:57 because it's in year 2020.

CodePudding user response:

Here is another way to do it. Using collect_list

import pyspark.sql.functions as F
from pyspark.sql import Window


sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(F.col('ORDERED_TIME').desc()).rangeBetween(Window.unboundedPreceding,  Window.unboundedFollowing)
df2 = (
  df
  .withColumn("second_recent_order", (F.collect_list(F.col("ORDERED_TIME")).over(sorted_order_times))[1])
)
df2.show()

Final output

CodePudding user response:

You can use window here in the following way, but you will get null if only one row would be in a group


sorted_order_times = Window.partitionBy("CUSTOMER_ID", "ADDRESS_ID").orderBy(desc('ORDERED_TIME')).rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

df2 = df2.withColumn(
    "second_recent_order",
    collect_list("ORDERED_TIME").over(sorted_order_times).getItem(1)
)

  • Related