I have this df:
df = spark.createDataFrame(
[('row_a', 5.0, 0.0, 11.0),
('row_b', 3394.0, 0.0, 4543.0),
('row_c', 136111.0, 0.0, 219255.0),
('row_d', 0.0, 0.0, 0.0),
('row_e', 0.0, 0.0, 0.0),
('row_f', 42.0, 0.0, 54.0)],
['value', 'col_a', 'col_b', 'col_c']
)
I would like to use .quantile(0.25, axis=1)
from Pandas which would add one column:
import pandas as pd
pdf = df.toPandas()
pdf['25%'] = pdf.quantile(0.25, axis=1)
print(pdf)
# value col_a col_b col_c 25%
# 0 row_a 5.0 0.0 11.0 2.5
# 1 row_b 3394.0 0.0 4543.0 1697.0
# 2 row_c 136111.0 0.0 219255.0 68055.5
# 3 row_d 0.0 0.0 0.0 0.0
# 4 row_e 0.0 0.0 0.0 0.0
# 5 row_f 42.0 0.0 54.0 21.0
Performance to me is important, so I assume pandas_udf
from pyspark.sql.functions
could do it in a more optimized way. But I struggle to make a performant and useful function. This is my best attempt:
from pyspark.sql import functions as F
import pandas as pd
@F.pandas_udf('double')
def quartile1_on_axis1(a: pd.Series, b: pd.Series, c: pd.Series) -> pd.Series:
pdf = pd.DataFrame({'a':a, 'b':b, 'c':c})
return pdf.quantile(0.25, axis=1)
df = df.withColumn('25%', quartile1_on_axis1('col_a', 'col_b', 'col_c'))
I don't like that I need an argument for every column and later in the function addressing those arguments separately to create a df. All of those columns serve the same purpose, so IMHO there should be a way to address them all together, something like in this pseudocode:
def quartile1_on_axis1(*cols) -> pd.Series: pdf = pd.DataFrame(cols)
This way I could use this function for any number of columns.
Is it necessary to create a
pd.Dataframe
inside the UDF? To me this seems the same as without UDF (Spark df -> Pandas df -> Spark df), as shown above. Without UDF it's even shorter. Should I really try to make it work withpandas_udf
? I think this was designed specifically for this kind of purpose...
CodePudding user response:
I would use GroupedData. Because this requires you pass the df's schema, add a column with the required datatype and get the schema. Pass that schema when required. Code below;
#Generate new schema by adding new column
sch =df.withColumn('quantile25',lit(110.5)).schema
#udf
def quartile1_on_axis1(pdf):
pdf =pdf.assign(quantile25=pdf.quantile(0.25, axis=1))
return pdf
#apply udf
df.groupby('value').applyInPandas(quartile1_on_axis1, schema=sch).show()
#outcome
----- -------- ----- -------- ----------
|value| col_a|col_b| col_c|quantile25|
----- -------- ----- -------- ----------
|row_a| 5.0| 0.0| 11.0| 2.5|
|row_b| 3394.0| 0.0| 4543.0| 1697.0|
|row_c|136111.0| 0.0|219255.0| 68055.5|
|row_d| 0.0| 0.0| 0.0| 0.0|
|row_e| 0.0| 0.0| 0.0| 0.0|
|row_f| 42.0| 0.0| 54.0| 21.0|
----- -------- ----- -------- ----------
CodePudding user response:
The following seems to do what's required, but instead of pandas_udf
it uses a regular udf
. It would be great if I could employ pandas_udf
in a similar way.
from pyspark.sql import functions as F
import numpy as np
@F.udf('double')
def lower_quart(*cols):
return float(np.quantile(cols, 0.25))
df = df.withColumn('25%', lower_quart('col_a', 'col_b', 'col_c'))
df.show()
# ----- -------- ----- -------- -------
#|value| col_a|col_b| col_c| 25%|
# ----- -------- ----- -------- -------
#|row_a| 5.0| 0.0| 11.0| 2.5|
#|row_b| 3394.0| 0.0| 4543.0| 1697.0|
#|row_c|136111.0| 0.0|219255.0|68055.5|
#|row_d| 0.0| 0.0| 0.0| 0.0|
#|row_e| 0.0| 0.0| 0.0| 0.0|
#|row_f| 42.0| 0.0| 54.0| 21.0|
# ----- -------- ----- -------- -------