Home > Blockchain >  Spark assigning data according to a percentage
Spark assigning data according to a percentage

Time:10-24

Let's say I have multiple customer service tiers (Premium, Basic and Free) each of these have several dedicated support teams :

  • Premium => Purple, Blue, Green & Yellow
  • Basic => Red, White & Black
  • Free => Orange & Pink

Every client has a specific customer service tier.

What I'm trying to achieve is to dedicate a specific support team for each of my clients given their customer service tier. Also, I want the teams of a particular customer service tier to have more or less the same number of clients to handle.

When there's less clients than team in assigned to a particular customer support tier, I don't really care of which teams get assigned. I just want each team to handle approximately the same amount of clients.

In this example, my base Dataset looks like :

enter image description here

Some possible outputs :

enter image description here enter image description here

enter image description here enter image description here

I can't really figure out of a way to do with Spark, can anyone help me up ?

CodePudding user response:

Okay, so let's solve this step by step. If I was supposed to do this, I would first create a Map which maps each group with its possible amount of values, to avoid re-computation for each row, something like this:

// groupsDF is the single column df in your question
val groupsAvailableCount: Map[Int, Long] = 
  groupsDF
    .groupBy("group")
    .count
    .as[(Int, Long)]
    .collect.toMap
// result would be: Map(1 -> 2, 2 -> 3, 3 -> 4)

Now the second part is a bit tricky, because as far as you explained, the probability of each value in your groups is the same (like in group 1, all values have probability of 0.25), while their probabilities might not be the same in your actual problem. Anyway, this is a permutation with probabilities, you can decide how to sort things in your problem easily. The good thing about this second part is that it abstracts away all these permutation with probability problems into a single function, which you can change it as you wish, and the rest of your code will be immune to changes:

def getWithProbabilities(groupId: Int, probs: Map[String, Double]): List[String] = {
  def getValues(groupId: Int, probabilities: List[(String, Double)]): List[String] = {
    // here is the logic, you can change the sorting and other stuff as you would want to
    val take = groupsAvailableCount.getOrElse(groupId, 0L).toInt
    if (take > probabilities.length) getValues(groupId, probabilities ::: probabilities)
    else probabilities.sortBy(_._2).take(take).map(_._1)
  }
  getValues(groupId, probs.toList)
}

So now, you're somehow magically (the function abstraction thing) able to get the values you want, from each group based on the spec! Almost done, now you only need the distinct groupIds, and given groups, you can fetch the values for them, and create your rows:

groupsDF.distinct.as[Int]. // distinct group ids
  .collect.toList          // collect as scala list, to do the mappings
  .map(groupId => groupId -> getWithProbabilities(groupId, spec(groupId))) // here you calculate the values for the group, based on spec map
  .flatMap {
    case (groupId, values) => values.map(groupId -> _)
  } // inline the results to create unique rows
  .toDF("group", "value") // create your dataframe out of it

And the result would be:

 ----- ------------- 
|group|        value|
 ----- ------------- 
|    1|  value_one_x|
|    1|  value_two_x|
|    2|  value_one_y|
|    2|  value_two_y|
|    2|value_three_y|
|    3|  value_one_z|
|    3|  value_two_z|
|    3|  value_one_z|
|    3|  value_two_z|
 ----- ------------- 

Update:


So to use spark api and not to use collect, you can use a udf, which basically calls the previous function we had:

val getWithProbabilitiesUDF = udf { (groupId: Int) => 
  getWithProbabilities(groupId, spec(groupId))
}

At the end, just call it on the groups dataframe you have:

groupsDF
  .distinct
  .select(
    col("group"),
    explode(getWithProbabilitiesUDF(col("group"))) as "value"
  )
  • Related