Home > other >  Consult the spark algorithm (scala)
Consult the spark algorithm (scala)

Time:09-26

Existing A large table A (IP, value)
IP is the IP address translation of decimal Double numerical

1 small table B (beginip lastip, label)
Beginip to lastip identifies a IP address
The label for this IP address belonging operator (such as: mobile, unicom, telecom)

Need to do join in large table map phase, the results will be the IP address into IP addresses belonging operator, consult the spark scala program implementation code, thank you!

CodePudding user response:

Val broadCastMap=sc. Broadcast (iptable1)
Val sighttp1=sighttp. The map (x=& gt; (x.i remoteipv4, x)). MapPartitions ({iter=& gt;
Val m=broadCastMap value
Var f1="NULL"
For {
(k, v) & lt; - iter
Val f=m. here (k + "& gt;=fstip and lastip>="+ k)
If (f. mount () & gt; 0)
F1=m. here (k + "& gt;=fstip and lastip>="+ k). First. Get (2) the toString
} yield (f1, v)
})
The algorithm implementation, do not know is not optimal, output the result which is able to match to the line, can't match line output,

CodePudding user response:

Why not just use SQLContext,,,

CodePudding user response:

 
Val sqlCtx=new SQLContext (sc)
SqlCtx. Read (). The JDBC (XXX). RegisterTempTable (" t_a)
SqlCtx. Read (). The JDBC (XXX). RegisterTempTable (" t_b)

Val res=sqlCtx. SQL (" SELECT Anderson p, b.l Abel FROM t_a a JOIN t_b b ON Anderson p BETWEEN b.b eginip AND b.e ndip ")
Res. The show (100)

CodePudding user response:

Thanks for link0007 advice! I use your way,
But I'm under a lot of big table, there are billions of rows, small table tens of thousands of lines, this kind of situation in reduce - side join running very slow, so I think the map - side join efficiency will be higher, my source code is as follows, the spark - the shell - master yarn - run the client cluster, NullPointerException, don't know what reason be? The abnormal line is val f=m. here (k + "& gt;=fstip and lastip>="+ k), if this line to f=" test "so constants can normal run,

The import org. Apache. Spark. SQL. Hive. HiveContext
The import org. Apache. Spark. SparkContext
The import org. Apache. Spark. SQL. SQLContext

The object test1 {
Case class HTTPXDR (localipv4: String, remoteipv4: Double, host: String, wholeurl: String)
Case class iptables (fstip: Double, lastip: Double, label: String)
Def main (args: Array [String]) {
If (args. Length!=1) {
Println (" drives ebda & lt; Master> " )
Return
}
Val sc=new SparkContext (args (0), "ebda - spark," System. The getenv (" SPARK_HOME "))
Val hiveCtx=new HiveContext (sc)
Val sqlContext=new sqlContext (sc)
The import sqlContext. Implicits. _
Val sighttp=hiveCtx. SQL (" SELECT * FROM httpnew where reportdate='20160510' "). The map (h=& gt; HTTPXDR (h (1). The toString (), h (2). The toString (). The split (" \ \ ") (0). ToDouble * 1000000000 + h (2). The toString (). The split (" \ \ ") (1) toDouble * 1000000 + h (2). The toString (). The split (" \ \ ") (2) toDouble * 1000 + h (2). The toString (). The split (" \ \ ") (3) toDouble, h (3). The toString (), h (4). The toString ()))
Val iptable1=hiveCtx. SQL (" select * from the configuration. The wwipdsttable "). The map (IP=& gt; The iptables (IP (0). The toString (). ToDouble, IP (1). The toString (). ToDouble, IP (3). The toString ())). ToDF ()
Val broadCastMap=sc. Broadcast (iptable1)
Val sighttp1=sighttp. The map (line=& gt; (line. Iremoteipv4, line)). MapPartitions ({iter=& gt;
Var f1="NULL"
Val m=broadCastMap value
For {
(k, v) & lt; - iter
Val f=m. here (k + "& gt;=fstip and lastip>="+ k)
If (f. ount> 0)
F1=f.f irst. Get (2). The toString
} yield (f1, v)
}). ToDF ()
Sighttp1. Show (10)
}
}

CodePudding user response:

Broadcast (RDD. Collect ()) is not reported to the null pointer errors, but the data frame is performed after collect the data frame where the operator can't use, such as the object of the broadcast don't support the data frame?

CodePudding user response:

I think there is still room for further optimization from the algorithm,
Now you a range of IP are small table data, if converted to the C class or a class B IP network segment exhaustion will have how old? Don't anticipate is astronomical,
Thus the scope polling into contour map, map/reduce does best estimate several orders of magnitude faster speed,
  • Related