I have two pyspark dataframes as shown below. One is a monthly schedule of money being received.
from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql import functions as F
from datetime import timedelta
data = [
(datetime(2021, 1, 18), 365),
(datetime(2021, 2, 18), 365),
(datetime(2021, 3, 18), 365),
(datetime(2021, 4, 18), 365),
(datetime(2021, 5, 18), 365),
(datetime(2021, 6, 18), 365),
(datetime(2021, 7, 18), 365),
(datetime(2021, 8, 18), 365),
(datetime(2021, 9, 18), 365),
(datetime(2021, 10, 18), 365),
(datetime(2021, 11, 18), 365),
(datetime(2021, 12, 18), 365),
]
schema = StructType(
[StructField("date", DateType(), True), StructField("amount", IntegerType(), True)]
)
df = spark.createDataFrame(data, schema)
df = (
df.withColumn("contract_id", F.lit("P0"))
.withColumn(
"pmt_id",
F.concat(
F.lit("P0_"),
F.row_number().over(Window.partitionBy().orderBy(F.col("date"))) - 1,
),
)
.select("contract_id", "pmt_id", "date", "amount")
)
df.show()
----------- ------ ---------- ------
|contract_id|pmt_id| date|amount|
----------- ------ ---------- ------
| P0| P0_0|2021-01-18| 365|
| P0| P0_1|2021-02-18| 365|
| P0| P0_2|2021-03-18| 365|
| P0| P0_3|2021-04-18| 365|
| P0| P0_4|2021-05-18| 365|
| P0| P0_5|2021-06-18| 365|
| P0| P0_6|2021-07-18| 365|
| P0| P0_7|2021-08-18| 365|
| P0| P0_8|2021-09-18| 365|
| P0| P0_9|2021-10-18| 365|
| P0| P0_10|2021-11-18| 365|
| P0| P0_11|2021-12-18| 365|
----------- ------ ---------- ------
and the second one is a daily schedule of the money being received:
data_2 = [(datetime(2021, 1, 18) timedelta(i), 12) for i in range(365)]
schema_2 = StructType(
[StructField("date", DateType(), True), StructField("amount", IntegerType(), True)]
)
df_2 = spark.createDataFrame(data_2, schema_2)
df_2 = df_2.withColumn(
"contract_id",
F.concat(
F.lit("P0"),
),
).select("contract_id", "date", "amount")
df_2.show()
----------- ---------- ------
|contract_id| date|amount|
----------- ---------- ------
| P0|2021-01-18| 12|
| P0|2021-01-19| 12|
| P0|2021-01-20| 12|
| P0|2021-01-21| 12|
| P0|2021-01-22| 12|
| P0|2021-01-23| 12|
| P0|2021-01-24| 12|
| P0|2021-01-25| 12|
| P0|2021-01-26| 12|
| P0|2021-01-27| 12|
| P0|2021-01-28| 12|
| P0|2021-01-29| 12|
| P0|2021-01-30| 12|
| P0|2021-01-31| 12|
| P0|2021-02-01| 12|
| P0|2021-02-02| 12|
| P0|2021-02-03| 12|
| P0|2021-02-04| 12|
| P0|2021-02-05| 12|
| P0|2021-02-06| 12|
----------- ---------- ------
I need to allocate the pmt_id
field from the monthly dataframe to the daily payments, so that it looks like this:
----------- ---------- ------ ------
|contract_id| date|amount|pmt_id|
----------- ---------- ------ ------
| P0|2021-01-18| 12| P0_0|
| P0|2021-01-19| 12| P0_0|
| P0|2021-01-20| 12| P0_0|
| P0|2021-01-21| 12| P0_0|
| P0|2021-01-22| 12| P0_0|
| P0|2021-01-23| 12| P0_0|
| P0|2021-01-24| 12| P0_0|
| P0|2021-01-25| 12| P0_0|
| P0|2021-01-26| 12| P0_0|
| P0|2021-01-27| 12| P0_0|
.
.
.
| P0|2021-02-17| 12| P0_0|
| P0|2021-02-18| 12| P0_1|
| P0|2021-02-19| 12| P0_1|
----------- ---------- ------ ------
I have tried the following, but it doesn't work.
(
df_2
.withColumn(
"cumulativeAmount",
F.sum(F.col("amount")).over(Window.partitionBy("contract_id").orderBy("date"))
)
.join(
df
.withColumn(
"cumulativeAmountMonthly",
F.sum(F.col("amount")).over(Window.partitionBy("contract_id").orderBy("date"))
)
.withColumnRenamed("date", "monthlyDate")
.withColumnRenamed("amount", "amountDaily"),
on=["contract_id"],
how="left"
)
.withColumn(
"groupId",
F.when(
F.col("cumulativeAmount") < F.col("cumulativeAmountMonthly"),
F.col("pmt_id")
)
.otherwise(None)
)
.distinct()
.orderBy("contract_id", "date")
).show()
----------- ---------- ------ ---------------- ------ ----------- ----------- ----------------------- -------
|contract_id| date|amount|cumulativeAmount|pmt_id|monthlyDate|amountDaily|cumulativeAmountMonthly|groupId|
----------- ---------- ------ ---------------- ------ ----------- ----------- ----------------------- -------
| P0|2021-01-18| 12| 12| P0_2| 2021-03-18| 365| 1095| P0_2|
| P0|2021-01-18| 12| 12| P0_4| 2021-05-18| 365| 1825| P0_4|
| P0|2021-01-18| 12| 12| P0_3| 2021-04-18| 365| 1460| P0_3|
| P0|2021-01-18| 12| 12| P0_6| 2021-07-18| 365| 2555| P0_6|
| P0|2021-01-18| 12| 12| P0_11| 2021-12-18| 365| 4380| P0_11|
| P0|2021-01-18| 12| 12| P0_10| 2021-11-18| 365| 4015| P0_10|
| P0|2021-01-18| 12| 12| P0_8| 2021-09-18| 365| 3285| P0_8|
| P0|2021-01-18| 12| 12| P0_1| 2021-02-18| 365| 730| P0_1|
| P0|2021-01-18| 12| 12| P0_0| 2021-01-18| 365| 365| P0_0|
| P0|2021-01-18| 12| 12| P0_7| 2021-08-18| 365| 2920| P0_7|
| P0|2021-01-18| 12| 12| P0_5| 2021-06-18| 365| 2190| P0_5|
| P0|2021-01-18| 12| 12| P0_9| 2021-10-18| 365| 3650| P0_9|
| P0|2021-01-19| 12| 24| P0_2| 2021-03-18| 365| 1095| P0_2|
| P0|2021-01-19| 12| 24| P0_10| 2021-11-18| 365| 4015| P0_10|
| P0|2021-01-19| 12| 24| P0_4| 2021-05-18| 365| 1825| P0_4|
| P0|2021-01-19| 12| 24| P0_0| 2021-01-18| 365| 365| P0_0|
| P0|2021-01-19| 12| 24| P0_1| 2021-02-18| 365| 730| P0_1|
| P0|2021-01-19| 12| 24| P0_11| 2021-12-18| 365| 4380| P0_11|
| P0|2021-01-19| 12| 24| P0_5| 2021-06-18| 365| 2190| P0_5|
| P0|2021-01-19| 12| 24| P0_8| 2021-09-18| 365| 3285| P0_8|
----------- ---------- ------ ---------------- ------ ----------- ----------- ----------------------- -------
CodePudding user response:
a simple join with a between
condition should do the trick :
df.withColumnRenamed("date", "start_date").withColumn(
"end_date",
F.coalesce(
F.lead("start_date").over(Window.orderBy("start_date")),
F.lit(date(9999, 12, 31)),
),
).join(
df_2,
on=df_2["date"].between(F.col("start_date"), F.col("end_date"))
).show(50)
----------- ------ ---------- ------ ---------- ----------- ---------- ------
|contract_id|pmt_id|start_date|amount| end_date|contract_id| date|amount|
----------- ------ ---------- ------ ---------- ----------- ---------- ------
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-18| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-19| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-20| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-21| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-22| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-23| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-24| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-25| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-26| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-27| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-28| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-29| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-30| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-01-31| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-01| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-02| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-03| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-04| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-05| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-06| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-07| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-08| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-09| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-10| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-11| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-12| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-13| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-14| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-15| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-16| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-17| 12|
| P0| P0_0|2021-01-18| 365|2021-02-18| P0|2021-02-18| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-02-18| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-02-19| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-02-20| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-02-21| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-02-22| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-02-23| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-02-24| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-02-25| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-02-26| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-02-27| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-02-28| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-03-01| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-03-02| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-03-03| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-03-04| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-03-05| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-03-06| 12|
| P0| P0_1|2021-02-18| 365|2021-03-18| P0|2021-03-07| 12|
----------- ------ ---------- ------ ---------- ----------- ---------- ------
only showing top 50 rows