Home > Back-end >  Allocating monthly payment IDs to daily cashflows
Allocating monthly payment IDs to daily cashflows

Time:11-09

I have two pyspark dataframes as shown below. One is a monthly schedule of money being received.

from datetime import datetime
from pyspark.sql.types import *
from pyspark.sql import functions as F
from datetime import timedelta

data = [
    (datetime(2021, 1, 18), 365),
    (datetime(2021, 2, 18), 365),
    (datetime(2021, 3, 18), 365),
    (datetime(2021, 4, 18), 365),
    (datetime(2021, 5, 18), 365),
    (datetime(2021, 6, 18), 365),
    (datetime(2021, 7, 18), 365),
    (datetime(2021, 8, 18), 365),
    (datetime(2021, 9, 18), 365),
    (datetime(2021, 10, 18), 365),
    (datetime(2021, 11, 18), 365),
    (datetime(2021, 12, 18), 365),
]

schema = StructType(
    [StructField("date", DateType(), True), StructField("amount", IntegerType(), True)]
)

df = spark.createDataFrame(data, schema)
df = (
    df.withColumn("contract_id", F.lit("P0"))
    .withColumn(
        "pmt_id",
        F.concat(
            F.lit("P0_"),
            F.row_number().over(Window.partitionBy().orderBy(F.col("date"))) - 1,
        ),
    )
    .select("contract_id", "pmt_id", "date", "amount")
)
df.show()
 ----------- ------ ---------- ------ 
|contract_id|pmt_id|      date|amount|
 ----------- ------ ---------- ------ 
|         P0|  P0_0|2021-01-18|   365|
|         P0|  P0_1|2021-02-18|   365|
|         P0|  P0_2|2021-03-18|   365|
|         P0|  P0_3|2021-04-18|   365|
|         P0|  P0_4|2021-05-18|   365|
|         P0|  P0_5|2021-06-18|   365|
|         P0|  P0_6|2021-07-18|   365|
|         P0|  P0_7|2021-08-18|   365|
|         P0|  P0_8|2021-09-18|   365|
|         P0|  P0_9|2021-10-18|   365|
|         P0| P0_10|2021-11-18|   365|
|         P0| P0_11|2021-12-18|   365|
 ----------- ------ ---------- ------ 

and the second one is a daily schedule of the money being received:

data_2 = [(datetime(2021, 1, 18)   timedelta(i), 12) for i in range(365)]

schema_2 = StructType(
    [StructField("date", DateType(), True), StructField("amount", IntegerType(), True)]
)

df_2 = spark.createDataFrame(data_2, schema_2)
df_2 = df_2.withColumn(
    "contract_id",
    F.concat(
        F.lit("P0"),
    ),
).select("contract_id", "date", "amount")
df_2.show()
 ----------- ---------- ------ 
|contract_id|      date|amount|
 ----------- ---------- ------ 
|         P0|2021-01-18|    12|
|         P0|2021-01-19|    12|
|         P0|2021-01-20|    12|
|         P0|2021-01-21|    12|
|         P0|2021-01-22|    12|
|         P0|2021-01-23|    12|
|         P0|2021-01-24|    12|
|         P0|2021-01-25|    12|
|         P0|2021-01-26|    12|
|         P0|2021-01-27|    12|
|         P0|2021-01-28|    12|
|         P0|2021-01-29|    12|
|         P0|2021-01-30|    12|
|         P0|2021-01-31|    12|
|         P0|2021-02-01|    12|
|         P0|2021-02-02|    12|
|         P0|2021-02-03|    12|
|         P0|2021-02-04|    12|
|         P0|2021-02-05|    12|
|         P0|2021-02-06|    12|
 ----------- ---------- ------ 

I need to allocate the pmt_id field from the monthly dataframe to the daily payments, so that it looks like this:

 ----------- ---------- ------ ------ 
|contract_id|      date|amount|pmt_id|
 ----------- ---------- ------ ------ 
|         P0|2021-01-18|    12|  P0_0|
|         P0|2021-01-19|    12|  P0_0|
|         P0|2021-01-20|    12|  P0_0|
|         P0|2021-01-21|    12|  P0_0|
|         P0|2021-01-22|    12|  P0_0|
|         P0|2021-01-23|    12|  P0_0|
|         P0|2021-01-24|    12|  P0_0|
|         P0|2021-01-25|    12|  P0_0|
|         P0|2021-01-26|    12|  P0_0|
|         P0|2021-01-27|    12|  P0_0|
.
.
.
|         P0|2021-02-17|    12|  P0_0|
|         P0|2021-02-18|    12|  P0_1|
|         P0|2021-02-19|    12|  P0_1|
 ----------- ---------- ------ ------  

I have tried the following, but it doesn't work.

(
    df_2
    .withColumn(
        "cumulativeAmount",
        F.sum(F.col("amount")).over(Window.partitionBy("contract_id").orderBy("date"))
    )
    .join(
        df
            .withColumn(
            "cumulativeAmountMonthly",
            F.sum(F.col("amount")).over(Window.partitionBy("contract_id").orderBy("date"))
        )
        .withColumnRenamed("date", "monthlyDate")
        .withColumnRenamed("amount", "amountDaily"),
        on=["contract_id"],
        how="left"
    )
    .withColumn(
        "groupId",
        F.when(
            F.col("cumulativeAmount") < F.col("cumulativeAmountMonthly"),
            F.col("pmt_id")
        )
        .otherwise(None)
    )
    .distinct()
    .orderBy("contract_id", "date")
).show()

 ----------- ---------- ------ ---------------- ------ ----------- ----------- ----------------------- ------- 
|contract_id|      date|amount|cumulativeAmount|pmt_id|monthlyDate|amountDaily|cumulativeAmountMonthly|groupId|
 ----------- ---------- ------ ---------------- ------ ----------- ----------- ----------------------- ------- 
|         P0|2021-01-18|    12|              12|  P0_2| 2021-03-18|        365|                   1095|   P0_2|
|         P0|2021-01-18|    12|              12|  P0_4| 2021-05-18|        365|                   1825|   P0_4|
|         P0|2021-01-18|    12|              12|  P0_3| 2021-04-18|        365|                   1460|   P0_3|
|         P0|2021-01-18|    12|              12|  P0_6| 2021-07-18|        365|                   2555|   P0_6|
|         P0|2021-01-18|    12|              12| P0_11| 2021-12-18|        365|                   4380|  P0_11|
|         P0|2021-01-18|    12|              12| P0_10| 2021-11-18|        365|                   4015|  P0_10|
|         P0|2021-01-18|    12|              12|  P0_8| 2021-09-18|        365|                   3285|   P0_8|
|         P0|2021-01-18|    12|              12|  P0_1| 2021-02-18|        365|                    730|   P0_1|
|         P0|2021-01-18|    12|              12|  P0_0| 2021-01-18|        365|                    365|   P0_0|
|         P0|2021-01-18|    12|              12|  P0_7| 2021-08-18|        365|                   2920|   P0_7|
|         P0|2021-01-18|    12|              12|  P0_5| 2021-06-18|        365|                   2190|   P0_5|
|         P0|2021-01-18|    12|              12|  P0_9| 2021-10-18|        365|                   3650|   P0_9|
|         P0|2021-01-19|    12|              24|  P0_2| 2021-03-18|        365|                   1095|   P0_2|
|         P0|2021-01-19|    12|              24| P0_10| 2021-11-18|        365|                   4015|  P0_10|
|         P0|2021-01-19|    12|              24|  P0_4| 2021-05-18|        365|                   1825|   P0_4|
|         P0|2021-01-19|    12|              24|  P0_0| 2021-01-18|        365|                    365|   P0_0|
|         P0|2021-01-19|    12|              24|  P0_1| 2021-02-18|        365|                    730|   P0_1|
|         P0|2021-01-19|    12|              24| P0_11| 2021-12-18|        365|                   4380|  P0_11|
|         P0|2021-01-19|    12|              24|  P0_5| 2021-06-18|        365|                   2190|   P0_5|
|         P0|2021-01-19|    12|              24|  P0_8| 2021-09-18|        365|                   3285|   P0_8|
 ----------- ---------- ------ ---------------- ------ ----------- ----------- ----------------------- ------- 

CodePudding user response:

a simple join with a between condition should do the trick :

df.withColumnRenamed("date", "start_date").withColumn(
    "end_date",
    F.coalesce(
        F.lead("start_date").over(Window.orderBy("start_date")),
        F.lit(date(9999, 12, 31)),
    ),
).join(
    df_2,
    on=df_2["date"].between(F.col("start_date"), F.col("end_date"))
).show(50)

 ----------- ------ ---------- ------ ---------- ----------- ---------- ------  
|contract_id|pmt_id|start_date|amount|  end_date|contract_id|      date|amount|
 ----------- ------ ---------- ------ ---------- ----------- ---------- ------ 
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-18|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-19|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-20|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-21|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-22|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-23|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-24|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-25|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-26|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-27|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-28|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-29|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-30|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-01-31|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-01|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-02|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-03|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-04|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-05|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-06|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-07|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-08|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-09|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-10|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-11|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-12|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-13|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-14|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-15|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-16|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-17|    12|
|         P0|  P0_0|2021-01-18|   365|2021-02-18|         P0|2021-02-18|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-02-18|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-02-19|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-02-20|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-02-21|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-02-22|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-02-23|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-02-24|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-02-25|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-02-26|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-02-27|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-02-28|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-03-01|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-03-02|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-03-03|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-03-04|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-03-05|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-03-06|    12|
|         P0|  P0_1|2021-02-18|   365|2021-03-18|         P0|2021-03-07|    12|
 ----------- ------ ---------- ------ ---------- ----------- ---------- ------ 
only showing top 50 rows
  • Related