Home > Back-end >  PySpark fill gap on datetimes with second frequency
PySpark fill gap on datetimes with second frequency

Time:10-07

Hy guys, i have this spark dataframe sample

datetimes                A      B
2020-10-01 00:00:00      1      2
2020-10-01 00:00:02      2      0
2020-10-01 00:00:07      1      5

I want to create time series model so i want to fill the datetime that is not exist on my dataframe. I'm new on spark so I don't know the solution using spark. I tried to convert it into pandas dataframe but the file is too big to be converted.

this is what i tried but in pyspark it can't be done

idx = pd.date_range('2020-12-31 23:59:58', '2021-09-20 08:59:59', freq='s')
df.index = pd.DatetimeIndex(df.index)
df = df.reindex(idx)

my second solution is to right join my dataframe with the idx but i think is not the best solution

CodePudding user response:

The most ideal solution may be to make your dates available in spark and join and perform any other operations in spark than trying to execute any operations on any particular node.

You may try the following which converts your pandas dataframe to a spark dataframe then left joins on your existing spark dataframe to fill in the missing datetimes.

import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import Window

idx = pd.date_range('2020-12-31 23:59:58', '2021-09-20 08:59:59', freq='s')

# create spark dataframe with all possible timestamps

datetimes_df = spark.createDataFrame(pd.DataFrame(idx,columns=['datetimes']))

# assuming your original data is in `original_df`, left join data
output_df = datetimes_df.join(original_df,["datetimes"],"left")

If your dataset of datetimes doesn't use too much memory, you may try broadcasting it to all nodes and see how that helps performance with the join instead eg.

output_df = F.broadcast(datetimes_df).join(original_df,["datetimes"],"left")

without filling the missing values from the left join, it may look like this:

 ------------------- ---- ---- 
|          datetimes|   A|   B|
 ------------------- ---- ---- 
|2020-10-01 00:00:00|   1|   2|
|2020-10-01 00:00:01|null|null|
|2020-10-01 00:00:02|   2|   0|
|2020-10-01 00:00:03|null|null|
|2020-10-01 00:00:04|null|null|
|2020-10-01 00:00:05|null|null|
|2020-10-01 00:00:06|null|null|
|2020-10-01 00:00:07|   1|   5|
 ------------------- ---- ---- 

You may treat missing values by using the previously available values

fill_window = Window.orderBy("datetimes").rowsBetween(Window.unboundedPreceding, Window.currentRow)

output_df = output_df.withColumn("A",F.when(
    F.col("A").isNull(),F.last("A",True).over(fill_window)
).otherwise(F.col("A")))
output_df = output_df.withColumn("B",F.when(
    F.col("B").isNull(),F.last("B",True).over(fill_window)
).otherwise(F.col("B")))

With your sample dataset shared in the question

output_df.printSchema()
output_df.show()

Your result may look like this:

root
 |-- datetimes: timestamp (nullable = true)
 |-- A: integer (nullable = true)
 |-- B: integer (nullable = true)

 ------------------- --- --- 
|          datetimes|  A|  B|
 ------------------- --- --- 
|2020-10-01 00:00:00|  1|  2|
|2020-10-01 00:00:01|  1|  2|
|2020-10-01 00:00:02|  2|  0|
|2020-10-01 00:00:03|  2|  0|
|2020-10-01 00:00:04|  2|  0|
|2020-10-01 00:00:05|  2|  0|
|2020-10-01 00:00:06|  2|  0|
|2020-10-01 00:00:07|  1|  5|
 ------------------- --- --- 

Let me know if this works for you.

  • Related