Home > Mobile >  Why is UDF not running in parallel on available executors?
Why is UDF not running in parallel on available executors?

Time:04-26

I have a tiny spark Dataframe that essentially pushes a string into a UDF. I'm expecting, because of .repartition(3), which is the same length as targets, for the processing inside run_sequential to be applied on available executors - i.e. applied to 3 different executors.

The issue is that only 1 executor is used. How can I parallelise this processing to force my pyspark script to assign each element of target to a different executor?

import pandas as pd
import pyspark.sql.functions as F

def run_parallel(config):
 
  def run_sequential(target):
    
    #process with target variable
    pass
    
  return F.udf(run_sequential)

targets = ["target_1", "target_2", "target_3"]

config = {}

pdf = spark.createDataFrame(pd.DataFrame({"targets": targets})).repartition(3)

pdf.withColumn(
    "apply_udf", run_training_parallel(config)("targets")
).collect()

CodePudding user response:

The issue here is that repartitioning a DataFrame does not guarantee that all the created partitions will be of the same size. With such a small number of records there is a pretty high chance that some of them will map into the same partition. Spark is not meant to process such small datasets and its algorithms are tailored to work efficiently with large amounts of data - if your dataset has 3 million records and you split it in 3 partitions of approximately 1 million records each, a difference of several records per partition will be insignificant in most cases. This is obviously not the case when repartitioning 3 records.

You can use df.rdd.glom().map(len).collect() to examine the size of the partitions before and after repartitioning to see how the distribution changes.

$ pyspark --master "local[3]"
...
>>> pdf = spark.createDataFrame([("target_1",), ("target_2",), ("target_3",)]).toDF("targets")
>>> pdf.rdd.glom().map(len).collect()
[1, 1, 1]
>>> pdf.repartition(3).rdd.glom().map(len).collect()
[0, 2, 1]

As you can see, the resulting partitioning is uneven and the first partition in my case is actually empty. The irony here is that the original dataframe has the desired property and that one is getting destroyed by repartition().

While your particular case is not what Spark typically targets, it is still possible to forcefully distribute three records in three partitions. All you need to do is to provide an explicit partition key. RDDs have the zipWithIndex() method that extends each record with its ID. The ID is the perfect partition key since its value starts with 0 and increases by 1.

>>> new_df = (pdf
      .coalesce(1)  # not part of the solution - see below
      .rdd                         # Convert to RDD
      .zipWithIndex()              # Append ID to each record
      .map(lambda x: (x[1], x[0])) # Make record ID come first
      .partitionBy(3)              # Repartition
      .map(lambda x: x[1])         # Remove record ID
      .toDF())                     # Turn back into a dataframe
>>> new_df.rdd.glom().map(len).collect()
[1, 1, 1]

In the above code, coalesce(1) is added only to demonstrate that the final partitioning is not influenced by the fact that pdf initially has one record in each partition.

A DataFrame-only solution is to first coalesce pdf to a single partition and then use repartition(3). With no partitioning column(s) provided, DataFrame.repartition() uses the round-robin partitioner and hence the desired partitioning will be achieved. You cannot simply do pdf.coalesce(1).repartition(3) since Catalyst (the Spark query optimisation engine) optimises out the coalesce operation, so a partitioning-dependent operation must be inserted in between. Adding a column containing F.monotonically_increasing_id() is a good candidate for such an operation.

>>> new_df = (pdf
      .coalesce(1)
      .withColumn("id", F.monotonically_increasing_id())
      .repartition(3))
>>> new_df.rdd.glom().map(len).collect()
[1, 1, 1]

Note that, unlike in the RDD-based solution, coalesce(1) is required as part of the solution.

  • Related