Hy guys, i have this spark dataframe sample
datetimes A B
2020-10-01 00:00:00 1 2
2020-10-01 00:00:02 2 0
2020-10-01 00:00:07 1 5
I want to create time series model so i want to fill the datetime that is not exist on my dataframe. I'm new on spark so I don't know the solution using spark. I tried to convert it into pandas dataframe but the file is too big to be converted.
this is what i tried but in pyspark it can't be done
idx = pd.date_range('2020-12-31 23:59:58', '2021-09-20 08:59:59', freq='s')
df.index = pd.DatetimeIndex(df.index)
df = df.reindex(idx)
my second solution is to right join my dataframe with the idx but i think is not the best solution
CodePudding user response:
The most ideal solution may be to make your dates available in spark and join and perform any other operations in spark than trying to execute any operations on any particular node.
You may try the following which converts your pandas dataframe to a spark dataframe then left joins on your existing spark dataframe to fill in the missing datetimes.
import pandas as pd
from pyspark.sql import functions as F
from pyspark.sql import Window
idx = pd.date_range('2020-12-31 23:59:58', '2021-09-20 08:59:59', freq='s')
# create spark dataframe with all possible timestamps
datetimes_df = spark.createDataFrame(pd.DataFrame(idx,columns=['datetimes']))
# assuming your original data is in `original_df`, left join data
output_df = datetimes_df.join(original_df,["datetimes"],"left")
If your dataset of datetimes doesn't use too much memory, you may try broadcasting it to all nodes and see how that helps performance with the join instead eg.
output_df = F.broadcast(datetimes_df).join(original_df,["datetimes"],"left")
without filling the missing values from the left join, it may look like this:
------------------- ---- ----
| datetimes| A| B|
------------------- ---- ----
|2020-10-01 00:00:00| 1| 2|
|2020-10-01 00:00:01|null|null|
|2020-10-01 00:00:02| 2| 0|
|2020-10-01 00:00:03|null|null|
|2020-10-01 00:00:04|null|null|
|2020-10-01 00:00:05|null|null|
|2020-10-01 00:00:06|null|null|
|2020-10-01 00:00:07| 1| 5|
------------------- ---- ----
You may treat missing values by using the previously available values
fill_window = Window.orderBy("datetimes").rowsBetween(Window.unboundedPreceding, Window.currentRow)
output_df = output_df.withColumn("A",F.when(
F.col("A").isNull(),F.last("A",True).over(fill_window)
).otherwise(F.col("A")))
output_df = output_df.withColumn("B",F.when(
F.col("B").isNull(),F.last("B",True).over(fill_window)
).otherwise(F.col("B")))
With your sample dataset shared in the question
output_df.printSchema()
output_df.show()
Your result may look like this:
root
|-- datetimes: timestamp (nullable = true)
|-- A: integer (nullable = true)
|-- B: integer (nullable = true)
------------------- --- ---
| datetimes| A| B|
------------------- --- ---
|2020-10-01 00:00:00| 1| 2|
|2020-10-01 00:00:01| 1| 2|
|2020-10-01 00:00:02| 2| 0|
|2020-10-01 00:00:03| 2| 0|
|2020-10-01 00:00:04| 2| 0|
|2020-10-01 00:00:05| 2| 0|
|2020-10-01 00:00:06| 2| 0|
|2020-10-01 00:00:07| 1| 5|
------------------- --- ---
Let me know if this works for you.