Home > database >  Best way to groupby and join in pyspark
Best way to groupby and join in pyspark

Time:11-18

Hi I have two dataframe like this:

df_1:

id   item   activity
1      2       a
34    14       b
1      2       b
 .     .       .

Activity has two uniqe values a and b.

df_2:

id   item   activity
1      2       c
34    14       c
1      2       c

Here activity has all same values c

Now I want final df where I have to groupby using id and item and get count of unique activities from df_1 and df_2 and later join them using id and item.

df_1_grp (Groupby using id and item and get count of activity frequency record):

df_1_grp = df_1.groupby("id", "item").agg(f.count(f.when(f.col('activity') == 'a', 1)).alias('a'), f.count(f.when(f.col('activity_type') == 'b', 1)).alias('b'))
id  item  a   b
1     2   1   1
34   14   0   1

df_2_grp (Groupby using id and item and just get the count of record as all values in activity is same):

df_2_grp = df_2.groupBy("id", "item").count().select('id', 'item', f.col('count').alias('c'))
id  item  c
1     2   2  
34   14   1   

And now join them to get final df:

df = df_1_grp.join(df_2_grp, on = ['id', 'item'], how = 'inner')

Expected Output:

id  item  a   b   c
1     2   1   1   2
34   14   0   1   1

Now because my dataframe is too big like probably 4 TB or 1 Billion records. I'm running out of disc storage. Is there more optimized and effecient way of doing it.

CodePudding user response:

You can change the spark config to allow for more memory. Can you clarify if it is storage space you're lacking or memory when running the program?

One method to reduce the memory of the spark session is to save each table to the disk before joining them. This helps runtime and memory usage (or atleast it did for me). Be sure to create new spark sessions after each save.

CodePudding user response:

Join is redundant, much more efficient way is first to union the two dataframes and then to perform groupBy (union is cheap operation that doesn't require shuffle).

my_df = df_1.union(df_2)
my_df.groupby("id", "item").agg(
      f.count(f.when(f.col('activity') == 'a', 1)).alias('a'),
      f.count(f.when(f.col('activity') == 'b', 1)).alias('b'),
      f.count(f.when(f.col('activity') == 'c', 1)).alias('c'),
)

Also, for large tables shuffles, as in your case, you need to verify you have adequate values for spark.sql.shuffle.partitions and spark.default.parallelism - at least 2000 in case of 4TB table (default is 200).

  • Related