Home > OS >  Does changing aggregate clause change anything in pandas_udf - pyspark?
Does changing aggregate clause change anything in pandas_udf - pyspark?

Time:11-19

I'm very new to spark and I was wondering if this changes anything regarding to memory consumption and how the task is assigned to its workers. See bellow the minimal example for you to be able to understand what I'm asking.

# import thing for the pandas udf
import pyspark.sql.functions as F
import pyspark.sql.types as T
# for creating minimal example
import pandas as pd
import numpy as np

#create minimal example 
df_minimal_example = pd.DataFrame({"x":np.arange(0,50,1), "y":np.arange(50,100,1) })
# crate a random integer 
df_minimal_example["PARTITION_ID"] = np.random.randint(0,2,size=len(df_minimal_example) )
sdf_minimal_example = spark.createDataFrame(df_minimal_example)

Let's print the output

   x   y  PARTITION_ID
0  0  50             1
1  1  51             0
2  2  52             1
3  3  53             1
4  4  54             0

Now I will perform the pandas udf, in order to be able to use my python function in spark

schema =  T.StructType([T.StructField('xy', T.FloatType() ),
                        T.StructField('x2', T.FloatType() ),
                        T.StructField('y2', T.FloatType() ), 
                        T.StructField('PARTITION_ID', T.LongType() )
                       ]
                      )

@F.pandas_udf(schema, F.PandasUDFType.GROUPED_MAP)
def newfunction(pdf):
  pdf["xy"] = pdf["x"]*pdf["y"]
  pdf["x2"] = pdf["x"]*pdf["x"]
  pdf["y2"] = pdf["y"]*pdf["y"]
  cols2retrieve = ["PARTITION_ID","xy","x2","y2"]
  newpdf = pdf[cols2retrieve].copy()
  return newpdf
  
newpdf = sdf_minimal_example.groupby("PARTITION_ID").apply(newfunction)
# to see results
display(newpdf ) 

As you see, I use .groupby("PARTITION_ID") when applying the pandas udf function; and the column "PARTITION_ID" has either 1 or 0. The Question is: what if PARTITION_ID has integers between 0 and 100 ? For example:

#instead of this
 df_minimal_example["PARTITION_ID"] = np.random.randint(0,2,size=len(df_minimal_example) ) 
# use this
df_minimal_example["PARTITION_ID"] = np.random.randint(0,100,size=len(df_minimal_example) ) 

does this change anything regarding to memory issues and how the task is assigned to each worker? If anyone coould provide a little more information about this that'd be nice.

CodePudding user response:

groupby is a Wide transformation in Spark which means data needs to be shuffled and this operation is usually memory consuming.

How does changing the aggregation key from 2 to a 100 will impact the performance is hard to tell in advance, because it depends on the "physical" repartitions of the data.

You can repartition your data using this PARTITION_ID and it could speed up operation down the line if you leverage this column for joins or groupby.

I say "could" because there's a trade off and having a lot of small files might impact performance on other activities, so it's not as straight forward as just repartitioning on the right column to see performance improve.

See this post for more detail.

  • Related