Home > Net >  Optimizing "withColumn when otherwise" performance in pyspark
Optimizing "withColumn when otherwise" performance in pyspark

Time:10-21

I work on project with pyspark on databricks . I have a part of code (below) that reformat a string based on a date (french).

The existing code, besides from being verbose, is causing some performance issues like :

  • not being able to display the dataframe, having a constant "running command"
  • causing "Driver is up but is not responsive, likely due to GC."

Only csv files are used in this projet (for read and write). No database is used.

I'm trying to handle the formatting task in a better way to avoid the performace and memory issues. Any suggestion?

Thanks a lot !

courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Janvier 2020","XXX0120").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Fevrier 2020","XXX0220").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mars 2020","XXX0320").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Avril 2020","XXX0420").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mai 2020","XXX0520").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juin 2020","XXX0620").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juillet 2020","XXX0720").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Aout 2020","XXX0820").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Septembre 2020","XXX0920").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Octobre 2020","XXX1020").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Novembre 2020","XXX1120").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Decembre 2020","XXX1220").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Janvier 2021","XXX0121").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Fevrier 2021","XXX0221").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mars 2021","XXX0321").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Avril 2021","XXX0421").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Mai 2021","XXX0521").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juin 2021","XXX0621").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Juillet 2021","XXX0721").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Aout 2021","XXX0821").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Septembre 2021","XXX0921").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Octobre 2021","XXX1021").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Novembre 2021","XXX1121").otherwise(courriers["Vague"]))
courriers = courriers.withColumn('Vague',when(courriers["Vague"] == "XXX Decembre 2021","XXX1221").otherwise(courriers["Vague"]))

CodePudding user response:

It's much easier to programmatically generate full condition, instead of applying it one by one. The withColumn is well known for its bad performance when there is a big number of its usage.

The simplest way will be to define a mapping and generate condition from it, like this:

dates = {"XXX Janvier 2020":"XXX0120",
         "XXX Fevrier 2020":"XXX0220",
         "XXX Mars 2020":"XXX0320",
         "XXX Avril 2020":"XXX0420",
         "XXX Mai 2020":"XXX0520",
         "XXX Juin 2020":"XXX0620",
         "XXX Juillet 2020":"XXX0720",
         "XXX Aout 2020":"XXX0820",
         "XXX Septembre 2020":"XXX0920",
         "XXX Octobre 2020":"XXX1020",
         "XXX Novembre 2020":"XXX1120",
         "XXX Decembre 2020":"XXX1220",
         "XXX Janvier 2021":"XXX0121",
         "XXX Fevrier 2021":"XXX0221",
         "XXX Mars 2021":"XXX0321",
         "XXX Avril 2021":"XXX0421",
         "XXX Mai 2021":"XXX0521",
         "XXX Juin 2021":"XXX0621",
         "XXX Juillet 2021":"XXX0721",
         "XXX Aout 2021":"XXX0821",
         "XXX Septembre 2021":"XXX0921",
         "XXX Octobre 2021":"XXX1021",
         "XXX Novembre 2021":"XXX1121",
         "XXX Decembre 2021":"XXX1221"
         }

and from it we can generate our condition for all possible values:

import pyspark.sql.functions as F
cl = None
for k,v in dates.items():
    if cl is None:
        cl = F.when(F.col("Vague") == k, F.lit(v))
    else:
        cl = cl.when(F.col("Vague") == k, F.lit(v))

cl = cl.otherwise(F.col("Vague")).alias("Vague")

and it could be used as following:

df = spark.createDataFrame([["XXX Fevrier 2021"], ["22332"]], 
   schema="Vague string")
df.select(cl).show()

giving us expected result:

 ------- 
|  Vague|
 ------- 
|XXX0221|
|  22332|
 ------- 

Ideally, it could be generalized to work with any year, by using regular expressions, like this:

dates = {"XXX Janvier 20(\d{2})":"XXX01$1",
         "XXX Fevrier 20(\d{2})":"XXX02$1",
         "XXX Mars 20(\d{2})":"XXX03$1",
         "XXX Avril 20(\d{2})":"XXX04$1",
         "XXX Mai 20(\d{2})":"XXX05$1",
         "XXX Juin 20(\d{2})":"XXX06$1",
         "XXX Juillet 20(\d{2})":"XXX07$1",
         "XXX Aout 20(\d{2})":"XXX08$1",
         "XXX Septembre 20(\d{2})":"XXX09$1",
         "XXX Octobre 20(\d{2})":"XXX10$1",
         "XXX Novembre 20(\d{2})":"XXX11$1",
         "XXX Decembre 20(\d{2})":"XXX12$1",
         }

cl = None
for k,v in dates.items():
    if cl is None:
        cl = F.regexp_replace(F.col("Vague"), k, v)
    else:
        cl = F.regexp_replace(cl, k, v)

cl = cl.alias("Vague")

and it will give the same result, but will work with any year in 21st century

CodePudding user response:

Another solution could be to leverage the mapType

from pyspark.sql.functions import col, create_map, lit,split,concat
from itertools import chain
df = spark.createDataFrame([["XXX Fevrier 2021"], ["XXX Aout 2021"]], 
   schema="Vague string")

# Create a dict only for the given months
mapping = {
    "Janvier":"01",
    "Fevrier": "02",
    "Mars": "03",
    "Avril": "04",
    "Mai": "05",
    "Juin": "06",
    "Juillet": "07",
    "Aout": "08",
    "Septembre": "09",
    "Octobre": "10",
    "Novembre": "11",
    "Decembre": "12"}

# Create the mapping
mapping_expr = create_map([lit(x) for x in chain(*mapping.items())])

res = (
  df.withColumn("value", concat(
      split(col("Vague"),' ')[0] 
    , mapping_expr.getItem(split(col("Vague"),' ')[1])
    , concat(split(col("Vague"),' ')[2][3:4])))
)

res.show()

which provides the expected result

 ---------------- ------- 
|           Vague|  value|
 ---------------- ------- 
|XXX Fevrier 2021|XXX0221|
|   XXX Aout 2021|XXX0821|
 ---------------- ------- 
  • Related