Suppose a dataframe with two columns: C1 and C2
--- -----
|C1 | C2 |
--- -----
|A | B |
|C | D |
|A | E |
|E | F |
--- -----
My goal is: collect into in array intersections
--------------
| intersections|
--------------
|[A, B, E, F] |
|[C, D] |
--------------
How it can be done good if the dataframe has the large number of rows (~ 1 billion)
CodePudding user response:
Solution is GraphFrame library (https://graphframes.github.io/graphframes/docs/_site/index.html)
DISCLAIMER: tested with Spark 2.4.4 and GraphFrame 0.7.0
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window
import org.apache.spark.storage.StorageLevel
import scala.collection._
import org.graphframes.GraphFrame
object SparkApp extends App {
val appName = "appName"
val master = "local[*]"
val spark = SparkSession
.builder
.appName(appName)
.master(master)
.getOrCreate
import spark.implicits._
val dataTest =
Seq(
("A", "B"),
("C", "D"),
("A", "E"),
("E", "F")
).toDF("C1", "C2")
// it's mandatory for GraphFrame
spark.sparkContext.setCheckpointDir("/some/path/hdfs/test_checkpoints")
// dataframe to list of vertices and connections list
val graphTest: GraphFrame =
GraphFrame(
dataTest.select('C1 as "id").union(dataTest.select('C2 as "id")).distinct,
dataTest.withColumnRenamed("C1", "src").withColumnRenamed("C2","dst")
)
val graphComponentsTest = graphTest.connectedComponents.run()
val clustersResultTestDF =
graphComponentsTest
.groupBy("component")
.agg(collect_list("id") as "intersections")
clustersResultTestDF.show
}
output is
--------------
| intersections|
--------------
|[A, B, E, F] |
|[C, D] |
--------------