Actual data:
Array [(String, Array [(String, String)])]=Array ((219495999, Array ((1285059912, 1), (1285059912, 2), (1049968715, 1))))
I want to put him into the format of RDD:
(219495999, 1285059912, 3)//here made 1285059912 merger
(219495999, 1049968715, 1)
How to write? O great spirit action ~ ~ don't know if I'll have the clear problem...
My code:
//result4 is above the actual data of
Val result5=result4. FlatMap {x=& gt;
Val m=scala collection. The mutable. Map [String, Int] ()
X) _2) foreach {y=& gt;
Var TMP=m.g etOrElse (_1 y., 0)
TMP=TMP + y) _2) toInt
M (y. _1)=TMP
}
Val for z=((k, v) & lt; The -m) yield (x) _1, k, v)
Z.t oArray
}
//this code cannot perform error:
//error: polymorphic expression always be instantiated to expected type;
//found: [B & gt; : (String, String, Int)] Array [B]
//required: TraversableOnce [?]
//z.t oArray
CodePudding user response:
GroupByKey?CodePudding user response:
Def main (args: Array [String]) : Unit={Val sparkConf=new sparkConf (). SetAppName (this) getClass) getName + ""). The forDse
SparkConf. Set (" spark. Cores. Max ", "2")//leave a cores using most
Set (" spark. Executor. The memory ", "512 m")//each node using 1 gb of memory
Val sc=new SparkContext (sparkConf)
Val base=createTestData (sc)
Val translate=base. FlatMap {
X=& gt;
X) _2) map (f=& gt; (x) _1 + "#" + f. _1, f. _2. ToLong))
}
Val cc=translate. ReduceByKey (+ _ _). The map (x=& gt; (x) _1) split (" # ") (0), x. _1. Split (" # ") (1), x. _2))
Cc. Collect ()
}
Def createTestData (sc: SparkContext)={
Val aa=Array ((219495999, Array ((1285059912, 1), (1285059912), 2), (1049968715, 1))))
Sc. Parallelize (aa)
}
CodePudding user response:
Val b=result4. FlatMap {case (index, elements)=& gt;
Elements. The map {case (value1, value2)=& gt; (index, value1, value2)}
}