Home > Mobile >  How to groupBy in Spark using two columns and in both directions
How to groupBy in Spark using two columns and in both directions

Time:11-10

I want to group my dataframe elements, basing on two columns in both directions. This is a sample of used dataframe

val columns = Seq("src","dst")
val data = Seq(("A", "B"), ("B", "C"), ("C", "A"),("A", "B"), ("B", "A"), ("B", "A"),("A", "C"), ("B", "A"), ("C", "D"),("D", "C"), ("A", "C"), ("C", "A"))
val rdd = spark.sparkContext.parallelize(data)
val dff = spark.createDataFrame(rdd).toDF(columns:_*)

When I use simple groupBy on two columns I get this result

dff.groupBy("src","dst").count().show()
 --- --- ----- 
|src|dst|count|
 --- --- ----- 
|  B|  C|    1|
|  D|  C|    1|
|  A|  C|    2|
|  C|  A|    2|
|  C|  D|    1|
|  B|  A|    3|
|  A|  B|    2|
 --- --- ----- 

I want group columns where src and dst are the same in the other direction(for example grouping A,C and C,A together, A,B and B,A together...). The desired result is like that

 --- --- ----- 
|src|dst|count|
 --- --- ----- 
|  B|  C|    1|
|  D|  C|    2|
|  A|  C|    4|
|  B|  A|    5|
 --- --- ----- 

Any solutions ?

CodePudding user response:

You can create a new array column with array containing your two group by column, sort this array and group by this array column, as follow:

import org.apache.spark.sql.functions.{array, array_sort, col, count, first}

val result = dff.withColumn("group", array_sort(array(col("src"), col("dst"))))
  .groupBy("group")
  .agg(first("src").as("src"), first("dst").as("dst"), count("group").as("count"))
  .drop("group")

result dataframe is as follow:

 --- --- ----- 
|src|dst|count|
 --- --- ----- 
|A  |B  |5    |
|C  |A  |4    |
|B  |C  |1    |
|C  |D  |2    |
 --- --- ----- 

If you don't have array_sort method (available in spark 2.4), you can use when condition to reorder your two columns src and dst, as follows:

import org.apache.spark.sql.functions.{col, when}

val result = dff
  .withColumn("first", when(col("dst") < col("src"), col("src")).otherwise(col("dst")))
  .withColumn("second", when(col("dst") >= col("src"), col("src")).otherwise(col("dst")))
  .drop("src", "dst")
  .withColumnRenamed("first", "src")
  .withColumnRenamed("second", "dst")
  .groupBy("src", "dst")
  .count()

However, this second method only works for two columns

CodePudding user response:

Another way without using arrays, you can group by greatest and least functions like this:

dff.groupBy(
    least($"src", $"dst").as("src"),
    greatest($"src", $"dst").as("dst"),
).count().show

// --- --- ----- 
//|src|dst|count|
// --- --- ----- 
//|  B|  C|    1|
//|  A|  C|    4|
//|  C|  D|    2|
//|  A|  B|    5|
// --- --- ----- 

You can change the order of src and dst if the groupby if you want the greatest value to be in the src column for example.

  • Related