Home > database >  Order Spark RDD based on ordering in another RDD
Order Spark RDD based on ordering in another RDD

Time:11-24

I have an RDD with strings like this (ordered in a specific way):

["A","B","C","D"]

And another RDD with lists like this:

["C","B","F","K"],
["B","A","Z","M"],
["X","T","D","C"]

I would like to order the elements in each list in the second RDD based on the order in which they appear in the first RDD. The order of the elements that do not appear in the first list is not of concern.

From the above example, I would like to get an RDD like this:

["B","C","F","K"],
["A","B","Z","M"],
["C","D","X","T"]

I know I am supposed to use a broadcast variable to broadcast the first RDD as I process each list in the second RDD. But I am very new to Spark/Scala (and functional programming in general) so I am not sure how to do this.

CodePudding user response:

I am assuming that the first RDD is small since you talk about broadcasting it. In that case you are right, broadcasting the ordering is a good way to solve your problem.

// generating data
val ordering_rdd = sc.parallelize(Seq("A","B","C","D"))
val other_rdd = sc.parallelize(Seq(
    Seq("C","B","F","K"),
    Seq("B","A","Z","M"),
    Seq("X","T","D","C")
))
// let's start by collecting the ordering onto the driver
val ordering = ordering_rdd.collect()
// Let's broadcast the list:
val ordering_br = sc.broadcast(ordering)

// Finally, let's use the ordering to sort your records:
val result = other_rdd
    .map( _.sortBy(x => {
        val index = ordering_br.value.indexOf(x)
        if(index == -1) Int.MaxValue else index
    }))

Note that indexOf returns -1 if the element is not found in the list. If we leave it as is, all non-found elements would end up at the beginning. I understand that you want them at the end so I relpace -1 by some big number.

Printing the result:

scala> result.collect().foreach(println)
List(B, C, F, K)
List(A, B, Z, M)
List(C, D, X, T)
  • Related