Home > Blockchain >  Apache Spark: collect into an array intersections
Apache Spark: collect into an array intersections

Time:09-24

Suppose a dataframe with two columns: C1 and C2

 --- ----- 
|C1 | C2  |
 --- ----- 
|A  |  B  |
|C  |  D  |
|A  |  E  |
|E  |  F  |
 --- ----- 

My goal is: collect into in array intersections

 -------------- 
| intersections|
 -------------- 
|[A, B, E, F]  |
|[C, D]        |
 -------------- 

How it can be done good if the dataframe has the large number of rows (~ 1 billion)

CodePudding user response:

Solution is GraphFrame library (https://graphframes.github.io/graphframes/docs/_site/index.html)

DISCLAIMER: tested with Spark 2.4.4 and GraphFrame 0.7.0

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.expressions.Window

import org.apache.spark.storage.StorageLevel

import scala.collection._

import org.graphframes.GraphFrame

object SparkApp extends App {

val appName = "appName"
val master = "local[*]"
  
val spark = SparkSession
  .builder
  .appName(appName)
  .master(master)
  .getOrCreate
 
import spark.implicits._

val dataTest =
      Seq(
        ("A", "B"),
        ("C", "D"),
        ("A", "E"),
        ("E", "F")
      ).toDF("C1", "C2")

// it's mandatory for GraphFrame
spark.sparkContext.setCheckpointDir("/some/path/hdfs/test_checkpoints")

// dataframe to list of vertices and connections list
val graphTest: GraphFrame = 
GraphFrame(
    dataTest.select('C1 as "id").union(dataTest.select('C2 as "id")).distinct, 
    dataTest.withColumnRenamed("C1", "src").withColumnRenamed("C2","dst")
    )

val graphComponentsTest = graphTest.connectedComponents.run()

val clustersResultTestDF = 
graphComponentsTest
  .groupBy("component")
  .agg(collect_list("id") as "intersections")


clustersResultTestDF.show
}

output is

 -------------- 
| intersections|
 -------------- 
|[A, B, E, F]  |
|[C, D]        |
 -------------- 
  • Related