I am learning Spark, and I am trying to create a column of the difference in days between a date and a cutoff value.
Here is some data along with my solution using pandas.
lst = ['2018-11-21',
'2018-11-01',
'2018-10-09',
'2018-11-23',
'2018-11-08',
'2018-10-06',
'2018-11-27',
'2018-10-07',
'2018-10-23',
'2018-11-02']
d = pd.DataFrame({'event':np.arange(len(lst)),'ts':lst})
d['ts'] = d['ts'].apply(pd.to_datetime) # only needed because I have alist of strings
d['new_ts'] = d.ts - (d.ts.max() - pd.to_timedelta(15, unit='d'))
Unfortunately I can't find a way to adapt this logic to pyspark. I think the issue is in the subtraction of a static date that is not part of the DataFrame.
Assuming that df is the "Spark version" of the above dataset "d", here is one of the things I tried:
calculator = udf(lambda x: datediff(datediff(date_sub(max(x),30),x)))
c = df.withColumn('Recency',calculator(col('ts')))
However, he followings give me a long error c.select(col('Recency')).show(1) c.show(1)
Thanks in advance to everyone who is gonna help.
CodePudding user response:
The logic is:
- Compute max date.
- Subtract given number of days to get cutoff date.
- Find difference in days from cutoff date.
df = spark.createDataFrame(data=[["2018-11-21"],["2018-11-01"],["2018-10-09"],["2018-11-23"],["2018-11-08"],["2018-10-06"],["2018-11-27"],["2018-10-07"],["2018-10-23"],["2018-11-02"]], schema=["ts"])
df = df.withColumn("ts", F.to_date("ts", "yyyy-MM-dd"))
cutoff_dt = df.select(F.date_sub(F.max("ts"), 15).alias("cutoff_dt")).first().asDict()["cutoff_dt"]
df = df.withColumn("new_ts", F.datediff("ts", F.lit(cutoff_dt)))
df.show(truncate=False)
---------- ------
|ts |new_ts|
---------- ------
|2018-11-21|9 |
|2018-11-01|-11 |
|2018-10-09|-34 |
|2018-11-23|11 |
|2018-11-08|-4 |
|2018-10-06|-37 |
|2018-11-27|15 |
|2018-10-07|-36 |
|2018-10-23|-20 |
|2018-11-02|-10 |
---------- ------