Home > other >  How do I use DataFrame implement the following logic?
How do I use DataFrame implement the following logic?

Time:09-23

Use spark 1.6

The original data as below

 
Val baseDF=hiveContext. SQL (newSql)


 
ID ID2 C1 C2 C3 C4, C5... C 33
CM1 a 1 1 1 0 0
CM2 a 1 1 0 0 1
A 1 0 1 1 1 CM3
A CM4 1 1 1 1 1
CM5 a 1 1 1 1 1
K2 b 1 0 0 1 1 1
K3 b 1 1 1 1 1
K1 b 1 0 0 0 0 1





 


ID ID2 C1 C2 C3 C4, C5... C 33
CM1 a 1 1 1 0 0
CM2 a 1 0 0 0 0
A 0 0 0 0 1 CM3
A CM4 0 0 0 0 0
CM5 a 0 0 0 0 0
K1 b 1 0 0 0 0 1
K2 b 1 0 0 1 1 0
K3 b 1 1 0 0 0



Logic is based on the minimum at the same time do ID2 groupby then find ID Cn for 1 set to 1, the other is set to 0
The Cn for most c 33
If in case the class will more than limit

This is my current, to try out the result is wrong, should be used the wrong method should be used after the understanding group
 

Case class testGoods (ID: String, ID2: String, C1: String, C2: String)

Val cartMap=new HashMap [String, Set [(String, String, String)]] with MultiMap [String, (String, String, String)]

Val baseDF=hiveContext. SQL (newSql)

Val testRDD=baseDF. MapPartitions (partition=& gt; {
While (partition. HasNext) {
Val record=partition. The next ()
Val ID=record. Get string (0)
If (ID!=null & amp; & ID! {
="null")Val ID2=record. Get string (1)
Val C1=record. Get string (2)
Val C2=record. Get string (3)
CartMap. AddBinding (ID2, (ID, C1, C2))
}
}
CartMap. Iterator
})

Val recordList=new mutable. ListBuffer [testGoods] ()
Val testRDD1=testRDD. MapPartitions (partition=& gt; {
While (partition. HasNext) {
Val record=partition. The next ()
Val ID2=record. _1
Val recordRow=record. _2
Val sortedRecordRow=TreeSet [(String, String, String)] () + + recordRow
Val dic=new mutable. HashMap [String, String]


For (v<- sortedRecordRow) {
Val ID=v. _1
Val C1=v. _2
Val C2=v. _3

{if (dic. The contains (ID2))
Val goodsValue=https://bbs.csdn.net/topics/dic.get (ID2)
If (" 1 ". The equals (goodsValue)) {
RecordList. Append (new testGoods (ID, ID2, "0", C2))
} else {
Dic. The put (ID2, C1)
RecordList. Append (new testGoods (ID, ID2, C1 and C2))
}
} else {
Dic. The put (ID2, C1)
RecordList. Append (new testGoods (ID, ID2, C1 and C2))
}
}
}
RecordList. Iterator
})


Val searchToItemNewDF=hiveContext. CreateDataFrame (testRDD1) repartition (1)
. RDD. Map {r=& gt; R.m kString (" \ t ")}
. SaveAsTextFile ("/data/testRDD1 ")


Checked on the Internet if everyone use groupBy + agg realize oneself also try to use the

 
BaseDF. GroupBy (" ID2 "). Agg ((collect_list ($" ID "), collect_list ($" ID2 ")))


But hasn't tried for a long time to use mapPartitions logic using
How do I use DataFrame implementation?


CodePudding user response:

Don 't know what you try to do. You need more detail example. Is my Chinese is not good enough?

CodePudding user response:

Sorry I don't speak clear new test data

Goal is grouped according to their first ID2 group then ID since the childhood is arranged under the
Then respectively the C1... Each column c 33 find ID first appeared 1 left other to 0

The original data
 ID ID2 C1 C2... C 33 
CM1 a 1 0
CM2 a 1 0
K13 1 0 0 f
A CM4 1 1
CM5 a 1 1
K14 0 f 1
K2 b 0 1
K3 b 1 1
K11 1 0 0 f
K12 1 0 0 f
K1 b 1 0
A 1 CM3 0


The target output
 ID ID2 C1 C2 
CM1 a 1 0
CM2 a 0 0
A 0 1 CM3
A CM4 0 0
CM5 a 0 0
K1 b 1 0
K2 b 0 1
K3 b 1 0 0
K11 1 0 0 f
K12 1 0 0 f
K13 1 0 0 f
K14 0 f 1



CodePudding user response:

 
Case class TestGoods (ID: String, ID2: String, C1: Int, C2: Int)
Case class CompressedRows (id2: String, ids: Array [String], indexs: Array (Int))

Def main (args: Array [String]) {
The import session. Implicits. _
Val random=new random (10)
Val generator: ()=& gt; The Int ()==& gt; {
If (random. NextBoolean ()) 1
The else 0
}
Val datasource=(1 to 100). The map (independence idx=& gt; TestGoods (s - ${independence idx} "ID", "s" ID2 - ${} independence idx % 20 ", the generator (), the generator ()))

Val df=session. SparkContext. MakeRDD (datasource). ToDF (" id ", "id2", "c1", "c2")
Df. Show (false)
The import session. Implicits. NewStringEncoder
Df. GroupByKey {case Row (_, id2: String, _, _)=& gt; Id2}
. MapGroups {
Case (id2, rows)=& gt;
Val cs: Array=[(Int, Boolean)] (1 to 2). The map (_=& gt; False), (1) toArray
Val sorted=rows. ToList. SortBy (row=& gt; Row (0). The hashCode ())
(0 until sorted. Length). The foreach {independence idx=& gt;
Val row=sorted (independence idx)
The row match {
Case Row (id, id2, c1: Int, c2: Int)=& gt;
if (! Cs (0) _2 & amp; & C1==1) cs (0)=(independence idx - & gt; True)
if (! Cs (1) _2 & amp; & C2 (1)===1) cs (independence idx - & gt; True)
}
}
Val compressedIdx=cs. The map (r=& gt; If (r. _2) r. _1 else sorted. The length of 1)
nullnullnullnullnullnullnullnullnullnull
  • Related