Home > database >  Is there a way to use a map function inside another map function?
Is there a way to use a map function inside another map function?

Time:02-11

I need to "traduce" the value inside a RDD using the value in another RDD Something like this:

rdd1=sc.parallelize(['aa,bb','cc,dd','ee,aa'])
rdd2=sc.parallelize(['aa,1' , 'bb,2' , 'cc,3' , 'dd,4' , 'ee,5'])
result: ['1,2', '3,4' , '5,1']

I've tried to use the following map function

      def mymap (c):
          src=c[0]
          dst=c[2]
          srcnew=rdd2.lookup(src)[0]
          dstnew=rdd2.lookup(dst)[0]
          return (srcnew,dstnew)

    rdd3=rdd1.map(mymap)

But i get the following error:

It appears that you are attempting to broadcast an RDD or reference an RDD from an action or transformation. RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(lambda x: rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

The only solution i came up to is to use the collect() command but my dataset is huge. There is a way to call the value in a RDD in another RDD?

CodePudding user response:

You should be able to accomplish this with some simple joins and maps. Note rdd1 and rdd2 below have been adjusted with a map function so the entries are available as arrays.

rdd1=sc.parallelize(['aa,bb','cc,dd','ee,ff']) \
.map(lambda x: x.split(','))
rdd2=sc.parallelize(['aa,1','bb,2','cc,3','dd,4','ee,5','ff,6']) \
.map(lambda x: x.split(','))

rdd3 = rdd1 \
.join(rdd2) \
.map(lambda x: x[1]) \
.join(rdd2) \
.map(lambda x: x[1]) \
.map(lambda x: ','.join(x))

rdd3.foreach(print)

# outputs
# 1,2
# 3,4
# 5,6

The above will

  1. join the rdds to resolve the first entry in rdd1 giving
    [('aa', ('bb', '1')), ('cc', ('dd', '3')), ('ee', ('ff', '5'))]
  2. map the above rdd to just be the values from the join giving
    [('bb', '1'), ('dd', '3'), ('ff', '5')]
  3. join with rdd2 again to resolve the second entry oringinally from rdd1 giving
    [('bb', ('1', '2')), ('dd', ('3', '4')), ('ff', ('5', '6'))]
  4. again, map the above rdd to just be the values from the join giving
    [('1', '2'), ('3', '4'), ('5', '6')]
  5. map the tuples to strings, joining on , giving
    ['1,2', '3,4', '5,6']

Steps 4 and 5 can obviously combined as lambda x : ','.join(x[1]) - I've kept them separate above for clarity

For a more complex example where you need to preserve an initial key see this answer

CodePudding user response:

If your dataset is huge you can use the Pair RDD Join operation and leverage the distributed computation of Spark RDD

In your case, you can lookup values with two joins as follows:

    val rdd1 = sc.parallelize(Seq("aa,bb", "cc,dd", "ee,ff"))
    val rdd2 = sc.parallelize(Seq("aa,1", "bb,2", "cc,3", "dd,4", "ee,5", "ff,6"))

    // Transform rdd to Key Value (KV) RDD (a.k.a Pair RDD)
    val rdd1KV: RDD[(String, String)] = rdd1.map(x => {
      val elements = x.split(",")
      (elements.head, elements.last)
    })
    val rdd2KV: RDD[(String, String)] = rdd2.map({ x =>
      val elements = x.split(",")
      (elements.head, elements.last)
    })


    /*
    Join and format the RDD as KV
    (ee,(ff,5)) => (ff,5)
    (aa,(bb,1)) => (bb,1)
    (cc,(dd,3)) => (dd,3)
     */
    val rddTmp = rdd1KV.join(rdd2KV).map(x => x._2)

    /* Second join and format the output
    (dd,(3,4)) => 3,4
    (ff,(5,6)) => 5,6
    (bb,(1,2)) => 1,2
     */
    rddTmp.join(rdd2KV).map(x => s"${x._2._1},${x._2._2}").foreach(println)
  • Related