Home > Software design >  Filter dataframe based on time range in another dataframe
Filter dataframe based on time range in another dataframe

Time:06-10

I have two PySpark dataframes. df1 having 10,000 rows and df2 having 500 million rows. I am trying to apply filter on df2 based on the values in df1 by iterating over df1, one row at a time. For example: corresponding to the first row in df1, I need to filter rows in df2 such that the timestamp field in df2 should be between the begin_time and end_time of df1 and also filter df2 node_name field based on node_name (array) in row1 of df1. Repeat this for all the rows of df1.

 ------------- ------------------- ------------------- ----------------------------------------------------------------------------
|      id     |begin_time         |end_time           |node_name                                                                  |
 ------------- ------------------- ------------------- --------------------------------------------------------------------------- 
|   1182      |2021-02-01 00:01:00|2021-02-01 00:02:20|[x24n15, x1117, x1108, x1113, x11n02, x11n03, x11n11, x11n32]              |
|   1183      |2021-02-01 00:02:40|2021-02-01 00:03:50|[x28n02, x1112, x1109, x1110]                                              |
|   1184      |2021-02-01 00:04:10|2021-02-01 00:07:10|[x32n10, x34n13, x13n16, x32n09, x28n01]                                   |
|   1185      |2021-02-01 00:05:00|2021-02-01 00:06:30|[x50n09, x50n08]                                                           |
|   1186      |2021-02-01 00:07:00|2021-02-01 00:08:20|[x50n08]                                                                   |

The dataframe df2 has 500 million rows and have around 100 columns but for simplicity showing 3 columns:

timestamp node_name sensor_val
2021-02-01 00:01:00 x24n15 23.5
2021-02-01 00:01:00 x24n15 23.5
2021-02-01 00:01:00 x24n16 23.5
2021-02-01 00:01:00 x24n17 23.5
2021-02-01 00:01:00 x24n18 23.5
2021-02-01 00:01:00 x24n19 23.5
2021-02-01 00:01:00 x24n20 23.5
2021-02-01 00:01:01 x24n15 23.5
2021-02-01 00:01:01 x24n15 23.5
2021-02-01 00:01:01 x24n16 23.5
2021-02-01 00:01:01 x24n17 23.5
2021-02-01 00:01:01 x24n18 23.5
2021-02-01 00:01:01 x24n19 23.5
2021-02-01 00:01:01 x24n20 23.5

The resultant table that I need:

timestamp nodes sensor_val id
2021-02-01 00:01:00 x24n15 23.5 1182
2021-02-01 00:01:00 x24n15 23.5 1182
2021-02-01 00:01:00 x24n16 23.5 1182
2021-02-01 00:01:00 x24n17 23.5 1183
2021-02-01 00:01:00 x24n18 23.5 1183
2021-02-01 00:01:00 x24n19 23.5 1184
2021-02-01 00:01:00 x24n20 23.5 1184
2021-02-01 00:01:01 x24n15 23.5 1184
2021-02-01 00:01:01 x24n15 23.5 1184
2021-02-01 00:01:01 x24n16 23.5 1185
2021-02-01 00:01:01 x24n17 23.5 1185
2021-02-01 00:01:01 x24n18 23.5 1185
2021-02-01 00:01:01 x24n19 23.5 1185
2021-02-01 00:01:01 x24n20 23.5 1185
many more rows (order of 100s million)

What I have tried so far is something like this, but it is very slow and may not be possible to use it in the production.

SeriesAppend = []
data_collect= df1.rdd.toLocalIterator()
for row in data_collect:
    bt = row.begin_time
    et = row.end_time 
    temp_row_df = spark.sql("SELECT timestamp_t, node_name, sensor_val FROM df2_table WHERE timestamp_t >= '2021-02-01 00:00:00' AND timestamp_t < '2021-02-02 00:00:00' AND node_name IN row.node_name ") 
    temp_row_df = temp_row_df.withColumn("node_name", F.lit(row.allocation_id))
    SeriesAppend.append(temp_row_df)
df_series = reduce(DataFrame.unionAll, SeriesAppend)

CodePudding user response:

You should prep up dataframes so that you can perform a join on the two dataframes.

  1. create a new column in df1 say "time_gap" which will have [begin_time - end_time] value.

  2. collect all possible combinations of "time_gap" values ( i.e [begin_time - end_time]) in to an array.

  3. in df1, explode "node_name" column in to a "node_name2" so that each row will correspond to one node_name2.

  4. in df2, create a new column called "time_gaps" and find the appropriate time_gap value(s) that the timestamp value on each row will fit into.

  5. in df2, explode the "time_gaps" in to a column say "time_gap", so each row will correspond to one time_gap.

  6. you can now join df1 X df2 on [df1.node_name2 == df2.node_name , df1.time_gap == df2.time_gap] and then perform your required tasks.

CodePudding user response:

Option 1 - probably inefficient on big tables as yours, but good on smaller:

df1_2 = df1.withColumn('node_name', F.explode('node_name'))
df = df2.join(df1_2, [df1_2.node_name == df2.node_name, df1_2.begin_time <= df2.timestamp, df2.timestamp <= df1_2.end_time], 'left')
df = df.select(df2['*'], 'id')

Option 2

df1_2 = (df1
    .withColumn('node_name', F.explode('node_name'))
    .withColumn('unix_time', F.explode(F.sequence(F.unix_timestamp('begin_time'), F.unix_timestamp('end_time'))))
)
df2_2 = df2.withColumn('unix_time', F.unix_timestamp('timestamp'))
df = df2_2.join(df1_2, ['node_name', 'unix_time'], 'left')
df = df.select(df2['*'], 'id')
  • Related