Home > Back-end >  Pyspark Big data question - How to add column from another dataframe (no common join column) and siz
Pyspark Big data question - How to add column from another dataframe (no common join column) and siz

Time:09-26

I am looking for a way to add a column from one pyspark dataframe, lets say this is DF1:

column1
123
234
345

to another pyspark dataframe, which will have any number of columns itself but not column1, DF2:

column2 column3 column4
000 data some1
253774 etc etc
1096 null more
999 other null

The caveat here is, I would like to avoid using Pandas, and I would like to avoid pulling all of the data into a single partition if possible. This will be up to Terabytes of data on the DF2 side, it will be running distributed on an EMR cluster.

DF1 will be a fixed set of numbers, which could be more or less than the row count of DF2. If DF2 has more rows, DF1 values should be repeated (think cycle). If DF1 has more rows, we don't exceed the rows in DF2 we just attach a value to each row (it doesn't matter if we include all of the rows from DF1.

If these requirements seem strange, it is because the value itself is important in DF1 and we need to use them in DF2, but it doesn't matter which value from DF1 is attached to each DF2 row (we just don't want to repeat the same value over and over, though some duplicates are fine)

What I've Tried:

  1. I have tried adding a row_number to each to join the dataframes, but we run into an issue with that when DF2 is larger than DF1.
  2. I tried duplicating DF1 x number of times to make it large enough to join to DF2 given a row_number, but this is running into java heap space issues on the EMR.

What I am hoping to find:

I am looking for a way to simply cycle over the values from DF1 and apply them to each row on DF2, but doing it with native Pyspark if possible.

In the end an example would look like this:

column1 column2 column3 column4
123 000 data some1
234 253774 etc etc
345 1096 null more
123 999 other null

CodePudding user response:

The combination of window functions row_number and ntile might be the answer:

  1. Apply a row_number on DF1 to get all records enumerated as the new column id

  2. Get the count of records in DF1 and store it as df1_count

  3. Apply ntile(df1_count) on DF2 as the new column id. Ntile will 'split' your DF2 rows into n as much as possible equal groups

  4. Join DF1 and DF2 on a new generated column id to combine both dataframes

Alternatively, instead of ntile(n), DF2 can also get a row_number() based column id which then can be used to calculate mod:

df.withColumn("id_mod", col("id") % lit(df1_count))

and then that id_mod to be joined with DF1 using DF1.id

  • Related