I have a spark-streaming application where I want to do some data transformations before my main operation, but the transformation involves some data validation.
When the validation fails, I want to log the failure cases, and then proceed on with the rest.
Currently, I have code like this:
def values: DStream[String] = ???
def validate(element: String): Either[String, MyCaseClass] = ???
val validationResults = values.map(validate)
validationResults.foreachRDD { rdd =>
rdd.foreach {
case Left(error) => logger.error(error)
case _ =>
}
}
val validatedValues: DStream[MyCaseClass] =
validationResults.mapPartitions { partition =>
partition.collect { case Right(record) => record }
}
This currently works, but it feels like I'm doing something wrong.
Questions
As far as I understand, this will perform the validation
twice, which is potentially wasteful.
- Is it correct to use
values.map(validation).persist()
to solve that problem? - Even if I persist the values, it still iterates and pattern matches on all the elements twice. It feels like there should be some method I can use to solve this. On a regular scala collection, I might use some of the cats
TraverseFilter
api, or withfs2.Stream
anevalMapFilter
. What DStream api can I use for that? Maybe something withmapPartitions
?
CodePudding user response:
I would say that the best way to tackle this is to take advantage that the stdlib flatMap
accepts Option
def values: DStream[String] = ???
def validate(element: String): Either[String, MyCaseClass] = ???
val validatedValues: DStream[MyCaseClass] =
values.map(validate).flatMap {
case Left(error) =>
logger.error(error)
None
case Right(record) =>
Some(record)
}
You can also be a little bit more verbose using mapPartitions
which should be a little bit more efficient.
CodePudding user response:
The 'best' option here depends a bit on the rest of your spark job and your version of spark.
Ideally you'd pick a mechanism natively understood by catalyst. The spark3 dataset observe listener may be what you're looking for there. (I haven't seen many examples of using this in the wild but it seems like this was the motivation behind such a thing.)
In pure spark sql, one option is to add a new column with the results of validation, e.g. a column named invalid_reason
which is NULL
if the record is valid or some [enumerated] string containing the reason the column failed validation. At this point, you likely want to persist/cache the dataset before doing a groupBy/count/collect/log
operation, then filter where invalid_reason is null
on the persisted dataframe and continue on the rest of the processing.
tl;dr: consider adding a validation column rather than just applying a 'validate' function. You then 'fork' processing here: log the records which have the invalid column specified, process the rest of your job on the records which don't. It does add some volume to your dataframe, but doesn't require processing the same records twice.