Home > other >  Spark graphX iterative calculation of the number of iterations & gt; 30 or so error Java. Lang. Stac
Spark graphX iterative calculation of the number of iterations & gt; 30 or so error Java. Lang. Stac

Time:09-23

In a recent study by spark GraphX framework implementation of neighbor spread parallel clustering algorithm (AP), but the code written after the number of iterations set & gt; 30, will be an error in the operation of the Java. Lang, stackoverflowerror, I checked before, it is possible that the number of iterations the excessive lineage is too long, but there is no effect, checkpoint and the wrong or error, in setting few iterations can run successfully, I am a spark1.6.0 environment, local pattern, and have relevant experience, be obliged!

Algorithm is the main body code:
Import org, apache hadoop. Conf. Configuration
Import org, apache hadoop. Fs. {FileSystem, Path}
The import org. Apache. Spark. The graphx. {Graph, TripletFields, VertexId}

/* *
* Created by LCJ on 2017.3.19.
*/
The class AP (
Val graphInput: Graph [VertexData EdgeData],
Val lambda: Double, val iterations: Int, val threshold: Int,
Val a: Double, val r: Double
)
Extends the Serializable {

Private var graph=this. GraphInput
Private val lam=this. Lambda
Private val maxIterNum=this. Iterations
Private val thresholdNum=this. Threshold
Private val avaiInitial=this. A
Private val respInitial=this. R


Private def checkOutputPath path: (String) : Unit={
Val fs=FileSystem. Get (new Configuration ())
If (fs. The exists (the new Path (Path))) {
Fs. Delete (new Path (Path), true)
}
}


Private def balance (valuePrevious: Double, valueNow: Double) : Double={
Lam * valuePrevious + (1 - lam) * valueNow
}


Private def getExemplars (g: Graph [VertexData EdgeData]) : Set [VertexId]={
G.a ggregateMessages [(VertexId, Double)] (
SendMsg=s=& gt; S.s endToSrc (spyware doctor stId, s.a TTR event. Avai + s.a TTR event, resp)),
MergeMsg=(a, b)=& gt; If (a. _2 & gt; B. a else _2) b,
TripletFields. EdgeOnly
). the map (v=& gt; V) _2) _1). Collect () toSet
}


Def the run () : the Unit={

Var prevG: Graph [VertexData EdgeData]=null

Var centers=Set [VertexId] ()

Var countForThreshold=0//clustering center (convergence) don't change the number of iterations count
Var flag=true
Var//the total number of iterations iterCount=0

For (_ & lt; - 1 to maxIterNum the if flag) {
//must use prevG to keep to the original figure first reference, and after the new produce, quick release old figure thoroughly.
//otherwise, after several iterations, there will be a memory leak, homework soon run out of cache space
PrevG=graph

//update r
Val updating_r=graph. AggregateMessages [Seq [Double]] (
SendMsg=s=& gt; S.s endToSrc (Seq (s.a TTR event. The similarity + s.a TTR event. Avai)),
MergeMsg=(a, b)=& gt; A + a + b,
TripletFields. EdgeOnly
)

Val updated_r=Graph (updating_r, Graph edges)
MapTriplets (t=& gt; {
Val filtered=t.s rcAttr. Filter (_!=(t.a TTR event. The similarity + t.a TTR event. Avai))
Val pool=
If (filtered. The size & lt; T.s rcAttr. Size - 1) filtered: + (t.a TTR event. The similarity + t.a TTR event. Avai)
The else filtered
Val maxValue=https://bbs.csdn.net/topics/if (pool. IsEmpty) 0.0 the else pool. The Max
EdgeData (t.a TTR event. Similarity, t.a TTR event. Avai, balance (resp. T.a TTR event., t.a TTR event. The similarity - maxValue))
}, TripletFields. Src)

Graph=graph. FromEdges (updated_r edges, VertexData (avaiInitial respInitial))

//update a
Val updating_a=graph. AggregateMessages [Double] (
SendMsg=s=& gt; {
If (s.s rcId!=spyware doctor stId) s.s endToDst (math. Max (resp. S.a TTR event., 0.0))
The else s.s endToDst (s.a TTR event. Resp)
},
MergeMsg=(a, b)=& gt; A + b,
TripletFields. EdgeOnly
)

Val updated_a=Graph (updating_a, Graph edges)
MapTriplets (t=& gt; {
If (t.s rcId!=t.d stId) {
Val a=the balance (
T.a TTR event. Avai,
Math. Min (0.0, t.d stAttr - math. Max (resp. T.a TTR event., 0.0))
)
EdgeData (t.a TTR event. Similarity, a, t.a TTR event, resp)
}
The else {
Val a=the balance (
T.a TTR event. Avai,
T.d stAttr - t.a TTR event. Resp
)
EdgeData (t.a TTR event. Similarity, a, t.a TTR event, resp)
}
}, TripletFields. Dst)

Graph=graph. FromEdges (updated_a edges, VertexData (avaiInitial respInitial)). The persist ()

IterCount +=1

//each update r and clustering center after a judge whether change
Val centersTmp=getExemplars (graph)
If (centers==centersTmp) {
CountForThreshold +=1
If (countForThreshold==thresholdNum) {
Flag=false
Println (" Break!" )
}
}
The else {
Centers=centersTmp
CountForThreshold=0
}

If (iterCount % 6==0) {
Graph. The cache ()
Graph. Checkpoint ()
Println (graph. NumVertices)
}

PrevG. UnpersistVertices ()
PrevG. Edges. Unpersist ()

}

Println (" algorithm is the total number of iterations: "+ iterCount)
Println (" don't change the number of clustering center: "+ countForThreshold)
Println (" Exemplars: "+ centers)

//determine each point to the center of the cluster distribution
Val clusterInfo=graph. AggregateMessages [(VertexId, Double, Double)] (
SendMsg=s=& gt; S.s endToSrc (spyware doctor stId, s.a TTR event. The similarity, s.a TTR event. Avai + s.a TTR event, resp)),
MergeMsg=(a, b)=& gt; If (a. _3 & gt; B. a else _3) b,
TripletFields. EdgeOnly
). The persist ()

//will point in the distribution of the text
CheckOutputPath (" member - the exemplar ")
ClusterInfo. MapValues (s=& gt; S. _1). SaveAsTextFile (" member - the exemplar ")

//calculate total error sum of squares
Val WSSSE=clusterInfo. The map (e=& gt; If (e. _1==e) _2) _1) 0.0 the else math.h pow (e) _2) _2, 2)). The sum ()
Println (" WSSSE: "+ WSSSE)

ClusterInfo. Unpersist ()
nullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnullnull
  • Related