Home > Back-end >  Spark Scala Conditionally add to agg
Spark Scala Conditionally add to agg

Time:03-29

Is it possible to add an aggregate conditionally in Spark Scala?

I would like to DRY out the following code by conditionally adding collect_set

Example:

    val aggDf = if (addId) groups.agg(
            count(lit(1)).as("Count"),
            percentile_approx($"waitTime",lit(0.5), lit(10000)),
            collect_set("Id").as("Ids")
        )
    else groups.agg(
            count(lit(1)).as("Count"),
            percentile_approx($"waitTime",lit(0.5), lit(10000))
        )

Maybe the is a better way of writing the whole code.

Thanks.

CodePudding user response:

You can store the aggreate columns in a sequence and alter the sequence as required:

var aggCols = Seq(count(lit(1)).as("Count"),
  percentile_approx($"waitTime",lit(0.5), lit(10000)))
if(addId) aggCols = aggCols :  collect_set("Id").as("Ids")

val aggDf = groups.agg(aggCols.head, aggCols.tail:_*)
  • Related