Home > Net >  Create column using Spark pandas_udf, without fixed number of input columns
Create column using Spark pandas_udf, without fixed number of input columns

Time:04-09

I have this df:

df = spark.createDataFrame(
    [('row_a', 5.0, 0.0, 11.0),
     ('row_b', 3394.0, 0.0, 4543.0),
     ('row_c', 136111.0, 0.0, 219255.0),
     ('row_d', 0.0, 0.0, 0.0),
     ('row_e', 0.0, 0.0, 0.0),
     ('row_f', 42.0, 0.0, 54.0)],
    ['value', 'col_a', 'col_b', 'col_c']
)

I would like to use .quantile(0.25, axis=1) from Pandas which would add one column:

import pandas as pd
pdf = df.toPandas()
pdf['25%'] = pdf.quantile(0.25, axis=1)
print(pdf)
#    value     col_a  col_b     col_c      25%
# 0  row_a       5.0    0.0      11.0      2.5
# 1  row_b    3394.0    0.0    4543.0   1697.0
# 2  row_c  136111.0    0.0  219255.0  68055.5
# 3  row_d       0.0    0.0       0.0      0.0
# 4  row_e       0.0    0.0       0.0      0.0
# 5  row_f      42.0    0.0      54.0     21.0

Performance to me is important, so I assume pandas_udf from pyspark.sql.functions could do it in a more optimized way. But I struggle to make a performant and useful function. This is my best attempt:

from pyspark.sql import functions as F
import pandas as pd
@F.pandas_udf('double')
def quartile1_on_axis1(a: pd.Series, b: pd.Series, c: pd.Series) -> pd.Series:
    pdf = pd.DataFrame({'a':a, 'b':b, 'c':c})
    return pdf.quantile(0.25, axis=1)

df = df.withColumn('25%', quartile1_on_axis1('col_a', 'col_b', 'col_c'))
  1. I don't like that I need an argument for every column and later in the function addressing those arguments separately to create a df. All of those columns serve the same purpose, so IMHO there should be a way to address them all together, something like in this pseudocode:

    def quartile1_on_axis1(*cols) -> pd.Series:
        pdf = pd.DataFrame(cols)
    

    This way I could use this function for any number of columns.

  2. Is it necessary to create a pd.Dataframe inside the UDF? To me this seems the same as without UDF (Spark df -> Pandas df -> Spark df), as shown above. Without UDF it's even shorter. Should I really try to make it work with pandas_udf? I think this was designed specifically for this kind of purpose...

CodePudding user response:

I would use GroupedData. Because this requires you pass the df's schema, add a column with the required datatype and get the schema. Pass that schema when required. Code below;

#Generate new schema by adding new column

sch =df.withColumn('quantile25',lit(110.5)).schema

#udf
def quartile1_on_axis1(pdf):
  
  pdf =pdf.assign(quantile25=pdf.quantile(0.25, axis=1))
 
  return pdf


 #apply udf 


df.groupby('value').applyInPandas(quartile1_on_axis1, schema=sch).show()


#outcome
 ----- -------- ----- -------- ---------- 
|value|   col_a|col_b|   col_c|quantile25|
 ----- -------- ----- -------- ---------- 
|row_a|     5.0|  0.0|    11.0|       2.5|
|row_b|  3394.0|  0.0|  4543.0|    1697.0|
|row_c|136111.0|  0.0|219255.0|   68055.5|
|row_d|     0.0|  0.0|     0.0|       0.0|
|row_e|     0.0|  0.0|     0.0|       0.0|
|row_f|    42.0|  0.0|    54.0|      21.0|
 ----- -------- ----- -------- ---------- 

CodePudding user response:

The following seems to do what's required, but instead of pandas_udf it uses a regular udf. It would be great if I could employ pandas_udf in a similar way.

from pyspark.sql import functions as F
import numpy as np

@F.udf('double')
def lower_quart(*cols):
    return float(np.quantile(cols, 0.25))
df = df.withColumn('25%', lower_quart('col_a', 'col_b', 'col_c'))

df.show()
# ----- -------- ----- -------- ------- 
#|value|   col_a|col_b|   col_c|    25%|
# ----- -------- ----- -------- ------- 
#|row_a|     5.0|  0.0|    11.0|    2.5|
#|row_b|  3394.0|  0.0|  4543.0| 1697.0|
#|row_c|136111.0|  0.0|219255.0|68055.5|
#|row_d|     0.0|  0.0|     0.0|    0.0|
#|row_e|     0.0|  0.0|     0.0|    0.0|
#|row_f|    42.0|  0.0|    54.0|   21.0|
# ----- -------- ----- -------- ------- 
  • Related