Home > Back-end >  Spark: Distribute low number of compute-intensive tasks via UDF
Spark: Distribute low number of compute-intensive tasks via UDF

Time:09-29

I have a spark cluster with 5 worker nodes available for computation (in Azure Databricks). However, the task I need to solve is different from a typical spark use case: Instead of having a simple task that needs to be applied to millions of rows, I have to run a very complex operation on 60 rows of data.

My intention is to basically distribute the 60 tasks to the 5 workers, so that each worker processes 60/5 = 12 tasks. For this to work, I understand that the number of executors should equal the number of workers. This seems to be the case, as indicated by running

num_executors = len(spark.sparkContext._jsc.sc().statusTracker().getExecutorInfos()) - 1
# returns 5

Here is some naive pseudocode that runs, but only on a single worker:

def my_complex_function(input):
  # this function uses all available cores (internally parallelized)
  # and takes about 15 minutes to complete per call if run on a 
  # single worker node
  do_stuff(input_row)
  write_output_to_file(stuff)
  return(debug_message)

UDF_function = udf(lambda z: my_complex_function(input_row), StringType())
sdf = spark.createDataFrame(data=data,schema=["data"])

# sdf contains 60 rows and a single column, "data".
# "data" is just a path to blob storage file that needs to be processed.

sdf_new = sdf.withColumn("output", UDF_function(col("data")))
display(sdf_new) # <- Triggers the computation

As mentioned, this seems to run on a single worker only. I assume that this is because as my dataset is so small, it is not distributed to different workers - I attempted to fix this with:

sdf = sdf.repartition(num_executors)

However, this still does not work. Only a single worker is used, as indicated by Spark UI and my logfiles.

What do I need to set in order to simply have each of the executors run their share of tasks, in parallel?

CodePudding user response:

The display function is speculating in scheduling as few tasks as possible in order to produce its output that is capped at 1000 rows. It starts by scheduling one task, and hope for that to be enough. Then 4, 20... and so on. That takes a long time in your case.

You can try collecting everything at the driver instead:

sdf_new.collect()

By collecting everything at the driver, you are for sure triggering a complete evaluation of your dataframe.

  • Related