I have two pyspark dataframes df1 having 10,000 rows and df2 having 500 million rows and I am trying to apply filter on df2 based on the values in df1 by iterating over df1, one row at a time. For example: corresponding to the first row in df1, I need to filter rows in df2 such that the timestamp field in df2 should be between the begin_time and end_time of df1 and also filter df2 node_name field based on node_name (array) in row1 of df1. Repeat this for all the rows of df1.
------------- ------------------- ------------------- ----------------------------------------------------------------------------
| id |begin_time |end_time |node_name |
------------- ------------------- ------------------- ---------------------------------------------------------------------------
| 1182 |2021-02-01 00:01:00|2021-02-01 00:02:20|[x24n15, x1117, x1108, x1113, x11n02, x11n03, x11n11, x11n32]. |
| 1183 |2021-02-01 00:02:40|2021-02-01 00:03:50|[x28n02, x1112, x1109, x1110] |
| 1184 |2021-02-01 00:04:10|2021-02-01 00:07:10|[x32n10, x34n13, x13n16, x32n09, x28n01] |
| 1185 |2021-02-01 00:05:00|2021-02-01 00:06:30|[x50n09, x50n08] |
| 1186 |2021-02-01 00:07:00|2021-02-01 00:08:20|[x50n08]
|
The dataframe df2 has 500 million rows and have around 100 columns but for simplicity showing 3 columns:
timestamp | node_name | sensor_val |
---|---|---|
2021-02-01 00:01:00 | x24n15 | 23.5 |
2021-02-01 00:01:00 | x24n15 | 23.5 |
2021-02-01 00:01:00 | x24n16 | 23.5 |
2021-02-01 00:01:00 | x24n17 | 23.5 |
2021-02-01 00:01:00 | x24n18 | 23.5 |
2021-02-01 00:01:00 | x24n19 | 23.5 |
2021-02-01 00:01:00 | x24n20 | 23.5 |
2021-02-01 00:01:01 | x24n15 | 23.5 |
2021-02-01 00:01:01 | x24n15 | 23.5 |
2021-02-01 00:01:01 | x24n16 | 23.5 |
2021-02-01 00:01:01 | x24n17 | 23.5 |
2021-02-01 00:01:01 | x24n18 | 23.5 |
2021-02-01 00:01:01 | x24n19 | 23.5 |
2021-02-01 00:01:01 | x24n20 | 23.5 |
The resultant table that I need
timestamp | nodes | sensor_val | id |
---|---|---|---|
2021-02-01 00:01:00 | x24n15 | 23.5 | 1182 |
2021-02-01 00:01:00 | x24n15 | 23.5 | 1182 |
2021-02-01 00:01:00 | x24n16 | 23.5 | 1182 |
2021-02-01 00:01:00 | x24n17 | 23.5 | 1183 |
2021-02-01 00:01:00 | x24n18 | 23.5 | 1183 |
2021-02-01 00:01:00 | x24n19 | 23.5 | 1184 |
2021-02-01 00:01:00 | x24n20 | 23.5 | 1184 |
2021-02-01 00:01:01 | x24n15 | 23.5 | 1184 |
2021-02-01 00:01:01 | x24n15 | 23.5 | 1184 |
2021-02-01 00:01:01 | x24n16 | 23.5 | 1185 |
2021-02-01 00:01:01 | x24n17 | 23.5 | 1185 |
2021-02-01 00:01:01 | x24n18 | 23.5 | 1185 |
2021-02-01 00:01:01 | x24n19 | 23.5 | 1185 |
2021-02-01 00:01:01 | x24n20 | 23.5 | 1185 |
many more rows (order of 100s million) | |||
What I have tried so far is something like this, but it is very slow and may not be possible to use it in the production.
SeriesAppend = []
data_collect= df1.rdd.toLocalIterator()
for row in data_collect:
bt = row.begin_time
et = row.end_time
temp_row_df = spark.sql("SELECT timestamp_t, node_name, sensor_val FROM df2_table WHERE timestamp_t >= '2021-02-01 00:00:00' AND timestamp_t < '2021-02-02 00:00:00' AND node_name IN row.node_name ")
temp_row_df = temp_row_df.withColumn("node_name", F.lit(row.allocation_id))
SeriesAppend.append(temp_row_df)
df_series = reduce(DataFrame.unionAll, SeriesAppend)
CodePudding user response:
You should prep up dataframes so that you can perform a join on the two dataframes.
create a new column in df1 say "time_gap" which will have [begin_time - end_time] value.
collect all possible combinations of "time_gap" values ( i.e [begin_time - end_time]) in to an array.
in df1, explode "node_name" column in to a "node_name2" so that each row will correspond to one node_name2.
in df2, create a new column called "time_gaps" and find the appropriate time_gap value(s) that the timestamp value on each row will fit into.
in df2, explode the "time_gaps" in to a column say "time_gap", so each row will correspond to one time_gap.
you can now join df1 X df2 on [df1.node_name2 == df2.node_name , df1.time_gap == df2.time_gap] and then perform your required tasks.
CodePudding user response:
Option 1 - probably inefficient on big tables as yours, but good on smaller:
df1_2 = df1.withColumn('node_name', F.explode('node_name'))
df = df2.join(df1_2, [df1_2.node_name == df2.node_name, df1_2.begin_time <= df2.timestamp, df2.timestamp <= df1_2.end_time], 'left')
df = df.select(df2['*'], 'id')
Option 2
df1_2 = (df1
.withColumn('node_name', F.explode('node_name'))
.withColumn('unix_time', F.explode(F.sequence(F.unix_timestamp('begin_time'), F.unix_timestamp('end_time'))))
)
df2_2 = df2.withColumn('unix_time', F.unix_timestamp('timestamp'))
df = df2_2.join(df1_2, ['node_name', 'unix_time'], 'left')
df = df.select(df2['*'], 'id')