Home > Back-end >  Best practice in Spark to filter dataframe, execute different actions on resulted dataframes and the
Best practice in Spark to filter dataframe, execute different actions on resulted dataframes and the

Time:10-01

Since I am new to Spark I would like to ask a question about a pattern that I am using in Spark but don't know if it's a bad practice ( splitting a dataframe in two based on a filter, execute different actions on them and then joining them back ). To give an example, having dataframe df:

val dfFalse = df.filter(col === false).distinct()

val dfTrue = df.filter(col === true).join(otherDf, Seq(id), "left_anti").distinct()

val newDf = dfFalse union dfTrue

Since my original dataframe has milions of rows I am curious if this filtering twice is a bad practice and I should use some other pattern in Spark which I may not be aware of. In other cases I even need to do 3,4 filters and then apply different actions to individual data frames and then union them all back.

Kind regards,

CodePudding user response:

There are several key points to take into account when you start to use Spark to process big amounts of data in order to analyze our performance:

Spark parallelism depends of the number of partitions that you have in your distributed memory representations(RDD or Dataframes). That means that the process(Spark actions) will be executed in parallel across the cluster. But note that there are two main different kind of transformations: Narrow transformations and wide transformations. The former represent operations that will be executed without shuffle, so the data don´t need to be reallocated in different partitions thus avoiding data transfer among workers. Consider that if you what to perform a distinct by a specific key Spark must reorganize the data in order to detect the duplicates. Take a look to the doc.

Regarding doing more or less filter transformations:

Spark is based on a lazy evaluation model, it means that all the transformations that you executes on a dataframe are not going to be executed unless you call an action, for example a write operation. And the Spark optimizer evaluates your transformations in order to create an optimized execution plan. So, if you have five or six filter operations it will never traverse the dataframe six times(in contrast to other dataframe frameworks). The optimizer will take your filtering operations and will create one. Here some details.

So have in mind that Spark is a distributed in memory data processor and it is a must to know these details because you can spawn hundreds of cores over hundred of Gbs.

CodePudding user response:

The efficiency of this approach highly depends on the ability to reduce the amount of the overlapped data files that are scanned by both the splits. I will focus on two techniques that allow data-skipping:

  1. Partitions - if the predicates are based on a partitioned column, only the necessary data will be scanned, based on the condition. In your case, if you split the original dataframe into 2 based on a partitioned column filtering, each dataframe will scan only the corresponding portion of the data. In this case, your approach will be perform really well as no data will be scanned twice.

  2. Filter/predicate pushdown - data stored in a format supporting filter pushdown (Parquet for example) allows reading only the files that contains records with values matching the condition. In case that the values of the filtered column are distributed across many files, the filter pushdown will be inefficient since the data is skipped on a file basis and if a certain file contains values for both the splits, it will be scanned twice. Writing the data sorted by the filtered column might improve the efficiency of the filter pushdown (on read) by gathering the same values into a fewer amount of files.

As long as you manage to split your dataframe, using the above techniques, and minimize the amount of the overlap between the splits, this approach will be more efficient.

  • Related