Home > OS >  Filter date as older than value in another dataframe
Filter date as older than value in another dataframe

Time:07-14

I have a Spark SQL dataframe consisting of observations for people with a survey date, as follows:

person_id survey_date
1 2022-03-11
2 2022-02-09
3 2022-01-21
4 2022-04-16

person_id is unique, but the dates can be shared or not.

I want to use this table to filter another table with transactions for the people. I only need the transactions that occurred after the survey date (>=)

person_id trans_date
1 2022-03-10
1 2022-03-14
1 2022-02-11
2 2022-01-30
2 2022-03-07
2 2022-02-16
3 2022-03-02
4 2022-05-15

In this case I would only want the following:

person_id trans_date
1 2022-03-14
2 2022-03-07
2 2022-02-16
4 2022-05-15

How to do this?

Data examples:

survey_data = [["1","2022-03-11"], 
["2","2022-02-09"], 
["3","2022-01-21"], 
["4","2022-04-16"]]

transacction_Data =[[ "1", "2022-03-10"],
[ "1", "2022-03-14"],
[ "1", "2022-02-11"],
[ "2", "2022-01-30"],
[ "2", "2022-03-07"],
[ "2", "2022-02-16"],
[ "3", "2022-03-02"],
[ "4", "2022-05-15"]]

CodePudding user response:

You can leverage join function from pyspark. One of the ways to achieve the ask(see if this helps):

from pyspark.sql.types import StructType,StructField, StringType, IntegerType,DateType
from pyspark.sql.functions import *

survey_data = [["1","2022-03-11"], 
["2","2022-02-09"], 
["3","2022-01-21"], 
["4","2022-04-16"]]

survey_dataframe = spark.createDataFrame(data=survey_data,schema=['person_id','survey_date'])
# Ignore if the date is already of DateType
survey_dataframe=survey_dataframe.select(col("person_id"),to_date(col('survey_date')).alias("survey_date"))

transacction_data =[[ "1", "2022-03-10"],
[ "1", "2022-03-14"],
[ "1", "2022-02-11"],
[ "2", "2022-01-30"],
[ "2", "2022-03-07"],
[ "2", "2022-02-16"],
[ "3", "2022-03-02"],
[ "4", "2022-05-15"]]
transacction_dataframe=spark.createDataFrame(data=transacction_data,schema=['person_id','trans_date'])
# Ignore if the date is already of DateType
transacction_dataframe=transacction_dataframe.select(col("person_id"),to_date(col("trans_date")).alias("trans_date"))

condition=[survey_dataframe.person_id==transacction_dataframe.person_id,survey_dataframe.survey_date<=transacction_dataframe.trans_date]
output=survey_dataframe.join(transacction_dataframe,condition,"inner").select(survey_dataframe["person_id"],"trans_date")
output.show()

Output:

 --------- ---------- 
|person_id|trans_date|
 --------- ---------- 
|        1|2022-03-14|
|        2|2022-03-07|
|        2|2022-02-16|
|        3|2022-03-02|
|        4|2022-05-15|
 --------- ---------- 

CodePudding user response:

First, join, then filter. Don't use >= <= < > inside join conditions, because it is a performance drag. Use those only if you can't escape it.

df = (df_trans
    .join(df_survey, "person_id")
    .filter("trans_date > survey_date")
    .drop("survey_date")
)

Note: default join is inner.

  • Related