I am looking for a way to add a column from one pyspark dataframe, lets say this is DF1:
column1 |
---|
123 |
234 |
345 |
to another pyspark dataframe, which will have any number of columns itself but not column1
, DF2:
column2 | column3 | column4 |
---|---|---|
000 | data | some1 |
253774 | etc | etc |
1096 | null | more |
999 | other | null |
The caveat here is, I would like to avoid using Pandas, and I would like to avoid pulling all of the data into a single partition if possible. This will be up to Terabytes of data on the DF2 side, it will be running distributed on an EMR cluster.
DF1 will be a fixed set of numbers, which could be more or less than the row count of DF2. If DF2 has more rows, DF1 values should be repeated (think cycle). If DF1 has more rows, we don't exceed the rows in DF2 we just attach a value to each row (it doesn't matter if we include all of the rows from DF1.
If these requirements seem strange, it is because the value itself is important in DF1 and we need to use them in DF2, but it doesn't matter which value from DF1 is attached to each DF2 row (we just don't want to repeat the same value over and over, though some duplicates are fine)
What I've Tried:
- I have tried adding a row_number to each to join the dataframes, but we run into an issue with that when DF2 is larger than DF1.
- I tried duplicating DF1 x number of times to make it large enough to join to DF2 given a row_number, but this is running into java heap space issues on the EMR.
What I am hoping to find:
I am looking for a way to simply cycle over the values from DF1 and apply them to each row on DF2, but doing it with native Pyspark if possible.
In the end an example would look like this:
column1 | column2 | column3 | column4 |
---|---|---|---|
123 | 000 | data | some1 |
234 | 253774 | etc | etc |
345 | 1096 | null | more |
123 | 999 | other | null |
CodePudding user response:
The combination of window functions row_number
and ntile
might be the answer:
Apply a
row_number
on DF1 to get all records enumerated as the new columnid
Get the count of records in DF1 and store it as
df1_count
Apply
ntile(df1_count)
on DF2 as the new columnid
. Ntile will 'split' your DF2 rows into n as much as possible equal groupsJoin DF1 and DF2 on a new generated column
id
to combine both dataframes
Alternatively, instead of ntile(n)
, DF2 can also get a row_number()
based column id
which then can be used to calculate mod:
df.withColumn("id_mod", col("id") % lit(df1_count))
and then that id_mod
to be joined with DF1 using DF1.id