I have a Spark SQL dataframe consisting of observations for people with a survey date, as follows:
person_id | survey_date |
---|---|
1 | 2022-03-11 |
2 | 2022-02-09 |
3 | 2022-01-21 |
4 | 2022-04-16 |
person_id
is unique, but the dates can be shared or not.
I want to use this table to filter another table with transactions for the people. I only need the transactions that occurred after the survey date (>=)
person_id | trans_date |
---|---|
1 | 2022-03-10 |
1 | 2022-03-14 |
1 | 2022-02-11 |
2 | 2022-01-30 |
2 | 2022-03-07 |
2 | 2022-02-16 |
3 | 2022-03-02 |
4 | 2022-05-15 |
In this case I would only want the following:
person_id | trans_date |
---|---|
1 | 2022-03-14 |
2 | 2022-03-07 |
2 | 2022-02-16 |
4 | 2022-05-15 |
How to do this?
Data examples:
survey_data = [["1","2022-03-11"],
["2","2022-02-09"],
["3","2022-01-21"],
["4","2022-04-16"]]
transacction_Data =[[ "1", "2022-03-10"],
[ "1", "2022-03-14"],
[ "1", "2022-02-11"],
[ "2", "2022-01-30"],
[ "2", "2022-03-07"],
[ "2", "2022-02-16"],
[ "3", "2022-03-02"],
[ "4", "2022-05-15"]]
CodePudding user response:
You can leverage join
function from pyspark.
One of the ways to achieve the ask(see if this helps):
from pyspark.sql.types import StructType,StructField, StringType, IntegerType,DateType
from pyspark.sql.functions import *
survey_data = [["1","2022-03-11"],
["2","2022-02-09"],
["3","2022-01-21"],
["4","2022-04-16"]]
survey_dataframe = spark.createDataFrame(data=survey_data,schema=['person_id','survey_date'])
# Ignore if the date is already of DateType
survey_dataframe=survey_dataframe.select(col("person_id"),to_date(col('survey_date')).alias("survey_date"))
transacction_data =[[ "1", "2022-03-10"],
[ "1", "2022-03-14"],
[ "1", "2022-02-11"],
[ "2", "2022-01-30"],
[ "2", "2022-03-07"],
[ "2", "2022-02-16"],
[ "3", "2022-03-02"],
[ "4", "2022-05-15"]]
transacction_dataframe=spark.createDataFrame(data=transacction_data,schema=['person_id','trans_date'])
# Ignore if the date is already of DateType
transacction_dataframe=transacction_dataframe.select(col("person_id"),to_date(col("trans_date")).alias("trans_date"))
condition=[survey_dataframe.person_id==transacction_dataframe.person_id,survey_dataframe.survey_date<=transacction_dataframe.trans_date]
output=survey_dataframe.join(transacction_dataframe,condition,"inner").select(survey_dataframe["person_id"],"trans_date")
output.show()
Output:
--------- ----------
|person_id|trans_date|
--------- ----------
| 1|2022-03-14|
| 2|2022-03-07|
| 2|2022-02-16|
| 3|2022-03-02|
| 4|2022-05-15|
--------- ----------
CodePudding user response:
First, join
, then filter
. Don't use >= <= < > inside join conditions, because it is a performance drag. Use those only if you can't escape it.
df = (df_trans
.join(df_survey, "person_id")
.filter("trans_date > survey_date")
.drop("survey_date")
)
Note: default join is inner.