Home > Software engineering >  How can I sum multiple columns in pyspark and return the max?
How can I sum multiple columns in pyspark and return the max?

Time:09-23

Let's say I have a 20 columns like this:

df.columns = ['col1','col2','col3', ..., 'col20']

I am trying to sum all these columns and create a new column where the value of the new column will be 1, if the sum of all the above columns is >0 and 0 otherwise. I am currently doing it in two steps as shown here:

df = df.withColumn("temp_col", col("col1") col("col2") ... col("col20"))
df = df.withColumn("new_col_2", when(col("temp_col") > 0, 1).otherwise(0))

Is there any way to do this in a one step and also with a better/cleaner way so I don't need to type all these column names?

I was trying to use something like this, but I have got an error.

df.na.fill(0).withColumn("new_col" ,reduce(add, [col(col(f'{x}') for x in range(0,20))]))
An error was encountered:
name 'add' is not defined
Traceback (most recent call last):
NameError: name 'add' is not defined

CodePudding user response:

You can do the following:

cols = ['col'   str(i) for i in range(1, 21)] # ['col1', 'col2',..., 'col20']

df['new_col'] = df[cols].sum(axis=1) > 0

If you want 1/0 instead of True/False, you can use .astype(int):

df['new_col'] = (df[cols].sum(axis=1) > 0).astype(int)

CodePudding user response:

From a Spark point of view everything is ok using the two withColumns. If you are concerned about performance issues due to one extra column let Spark's Catalyst optimzier deal with this problem. If you have already a Python array with the columns to be summed up you can create a SQL expression from this array in order to save some typing:

from pyspark.sql import functions as F

cols=df.columns #or cols=[f'col{c}' for c in range(1,21)]
sum_expr=" ".join(cols)
df.withColumn("temp_col", F.expr(sum_expr)) \
  .withColumn("new_col_2", F.when(F.col("temp_col") > 0, 1).otherwise(0)) \
  .show()

A solution in the direction of your second attempt (but imho overengineered) to calculate the sum would be to use a combination of array and aggregate:

df.withColumn("temp_col", F.aggregate(F.array(cols), F.lit(0).cast("long"), lambda l,r: l r))

CodePudding user response:

cols=['col1','col2','col3', ..., 'col20']

Pandas

df.assign(x=np.where(df.loc[:, cols].sum(axis=1)>1,1,0))

Pyspark

new = (df.withColumn('x', array(*[x for x in cols]))#Create array of all columns required
       .withColumn('x', when(expr("reduce(x, cast(0 as double), (c,i)-> c i)")>0,1).otherwise(0))#combinewhen and reduce
      ).show()
  • Related