Home > Back-end >  Spark Dataset - "edit" parquet file for each row
Spark Dataset - "edit" parquet file for each row

Time:11-26

Context

I am trying to use Spark/Scala in order to "edit" multiple parquet files (potentially 50k ) efficiently. The only edit that needs to be done is deletion (i.e. deleting records/rows) based on a given set of row IDs.

The parquet files are stored in s3 as a partitioned DataFrame where an example partition looks like this:

s3://mybucket/transformed/year=2021/month=11/day=02/*.snappy.parquet

Each partition can have upwards of 100 parquet files that each are between 50mb and 500mb in size.

Inputs

We are given a spark Dataset[MyClass] called filesToModify which has 2 columns:

  1. s3path: String = the complete s3 path to a parquet file in s3 that needs to be edited
  2. ids: Set[String] = a set of IDs (rows) that need to be deleted in the parquet file located at s3path

Example input dataset filesToModify:

s3path ids
s3://mybucket/transformed/year=2021/month=11/day=02/part-1.snappy.parquet Set("a", "b")
s3://mybucket/transformed/year=2021/month=11/day=02/part-2.snappy.parquet Set("b")

Expected Behaviour

Given filesToModify I want to take advantage of parallelism in Spark do the following for each row:

  1. Load the parquet file located at row.s3path
  2. Filter so that we exclude any row whose id is in the set row.ids
  3. Count the number of deleted/excluded rows per id in row.ids (optional)
  4. Save the filtered data back to the same row.s3path to overwrite the file
  5. Return the number of deleted rows (optional)

What I have tried

I have tried using filesToModify.map(row => deleteIDs(row.s3path, row.ids)) where deleteIDs is looks like this:

def deleteIDs(s3path: String, ids: Set[String]): Int = {
    import spark.implicits._
    val data = spark
        .read
        .parquet(s3path)
        .as[DataModel]

    val clean = data
        .filter(not(col("id").isInCollection(ids)))

    // write to a temp directory and then upload to s3 with same
    // prefix as original file to overwrite it
    writeToSingleFile(clean, s3path)

    1 // dummy output for simplicity (otherwise it should correspond to the number of deleted rows)
    }

However this leads to NullPointerException when executed within the map operation. If I execute it alone outside of the map block then it works but I can't understand why it doesn't inside it (something to do with lazy evaluation?).

CodePudding user response:

s3path and ids parameters that are passed to deleteIDs are not actually strings and sets respectively. They are instead columns.

In order to operate over these values you can instead create a UDF that accepts columns instead of intrinsic types, or you can collect your dataset if it is small enough so that you can use the values in the deleteIDs function directly. The former is likely your best bet if you seek to take advantage of Spark's parallelism.

You can read about UDFs here

CodePudding user response:

You get a NullPointerException because you try to retrieve your spark session from an executor.

It is not explicit, but to perform spark action, your DeleteIDs function needs to retrieve active spark session. To do so, it calls method getActiveSession from SparkSession object. But when called from an executor, this getActiveSession method returns None as stated in SparkSession's source code:

Returns the default SparkSession that is returned by the builder.

Note: Return None, when calling this function on executors

And thus NullPointerException is thrown when your code start using this None spark session.

More generally, you can't recreate a dataset and use spark transformations/actions in transformations of another dataset.

So I see two solutions for your problem:

  • either to rewrite DeleteIDs function's code without using spark, and modify your parquet files by using parquet4s for instance.
  • or transform filesToModify to a Scala collection and use Scala's map instead of Spark's one.
  • Related