Home > Software design >  How to join 2 dataframes and add a new column based on a filter pyspark
How to join 2 dataframes and add a new column based on a filter pyspark

Time:09-17

I have 2 dataframes,

df1 - CUSTOMER_ID, USER_ADDRESS_ID,ORDERED_TIME (time at which the order was placed)
df2 - CUSTOMER_ID, USER_ADDRESS_ID,latest_order (time at which the last order was placed from a specific address per user)

I obtained df2 from df1 with the following command

df2 = df1.groupBy('CUSTOMER_ID','USER_ADDRESS_ID').agg(f.max("ORDERED_TIME").alias('latest_order'))

From these 2 dataframes, I would like a resultant dataframe that contains the number of orders placed for a specific (CUSTOMER_ID,ADDRESS_ID) which range back from 30 days till the last order placed.

Sample Data

df1

CUSTOMER_ID|USER_ADDRESS_ID|       ORDERED_TIME|
 ----------- --------------- ------------------- 
|    7894496|      167514241|2021-01-27 13:37:49|
|   28596279|      178674171|2021-01-27 13:42:02|
|   12682115|      192834231|2021-01-27 22:20:23|
|    6981716|       13228441|2021-01-27 22:22:32|

df2


 ----------- --------------- ------------------- 
|CUSTOMER_ID|USER_ADDRESS_ID|       latest_order|
 ----------- --------------- ------------------- 
|    5145237|       83276530|2021-07-28 16:52:40|
|   11634405|       21756839|2021-09-08 20:43:35|
|   43919672|      120835117|2020-10-03 21:44:21|
|   71206555|      170807531|2020-10-30 14:00:43|

I tried using a UDF where I wanted to pass the second dataframe as an argument, but realized it's not possible to do so.

So, I tried to go ahead and do it with a join instead (not sure about the syntax) in the following way

df1.join(df2, ["CUSTOMER_ID","USER_ADDRESS_ID"], how="inner").select(df1.filter((df1.USER_ADDRESS_ID == df2.USER_ADDRESS_ID) & (df1.ORDERED_TIME >= date_sub(df2.latest_order, 30)) & (df1.ORDERED_TIME <= date_sub(df2.latest_order, 1))).count()).alias("order_counts").show()

I end up getting the following error

org.apache.spark.sql.AnalysisException: Resolved attribute(s) USER_ADDRESS_ID#433,latest_order#434 missing from CUSTOMER_ID#416,USER_ADDRESS_ID#417,ORDERED_TIME#418 in operator !Filter (((USER_ADDRESS_ID#417 = USER_ADDRESS_ID#433) &amp;&amp; (ORDERED_TIME#418 &gt;= cast(date_sub(cast(latest_order#434 as date), 30) as timestamp))) &amp;&amp; (ORDERED_TIME#418 &lt;= cast(date_sub(cast(latest_order#434 as date), 1) as timestamp))). Attribute(s) with the same name appear in the operation: USER_ADDRESS_ID. Please check if the right attribute(s) are used.;;

I am learning in PySpark, and would like to know the right way to solve this problem. Please let me know if you need any other information

Any help will be appreciated. Thanks!

CodePudding user response:

Latest order time can calculated via Window function, and each record "ORDERED_TIME" can be compared to latest:

val df1 = Seq(
  (7894496, 167514241, "2021-01-27 13:37:49"),
  (28596279, 178674171, "2021-01-27 13:42:02"),
  (12682115, 192834231, "2021-01-27 22:20:23"),
  (6981716, 13228441, "2021-01-27 22:22:32")
).toDF("CUSTOMER_ID", "USER_ADDRESS_ID", "ORDERED_TIME")

val customerAddressWindow = Window.partitionBy("CUSTOMER_ID", "USER_ADDRESS_ID")
val df2 = df1
  .withColumn("ORDERED_TIMESTAMP", to_timestamp($"ORDERED_TIME", "yyyy-MM-dd HH:mm:ss"))
  .withColumn("latest_order", max("ORDERED_TIMESTAMP").over(customerAddressWindow))
  .groupBy("CUSTOMER_ID", "USER_ADDRESS_ID", "latest_order")
  .agg(sum(
    when(datediff($"latest_order", $"ORDERED_TIMESTAMP") < 30, 1).otherwise(0)
  ).alias("Orders"))

Output is:

 ----------- --------------- ------------------- ------ 
|CUSTOMER_ID|USER_ADDRESS_ID|latest_order       |Orders|
 ----------- --------------- ------------------- ------ 
|6981716    |13228441       |2021-01-27 22:22:32|1     |
|12682115   |192834231      |2021-01-27 22:20:23|1     |
|7894496    |167514241      |2021-01-27 13:37:49|1     |
|28596279   |178674171      |2021-01-27 13:42:02|1     |
 ----------- --------------- ------------------- ------ 
  • Related