Home > Enterprise >  How to perform grouping in spark scala when key is not same
How to perform grouping in spark scala when key is not same

Time:05-04

I want to calculate the total sum of the amounts corresponding to the secondary accounts and compare its value with the primary account. In the following example, the account number that begins with "643" is the primary account and the accounts which appear after that are its secondary. Again another primary account appears which begins with "643" followed by its secondary accounts. I want to group the records that comprises primary and its secondary and compute the sum of the amounts for secondary accounts.

Input: Account, Amount 643100, 10000 ---- primary account 234100, 4000 ---- secondary account 231300, 1000 ---- secondary account 136400, 5000 ---- secondary account 643841, 20000 ---- next group 562100, 10000 432176, 10000 643304, 40000 ---- next group 124562, 20000 234567, 5000

Output: Account, Amount, sumofsecamounts 643100, 10000, 10000 643841, 20000, 20000 643304, 40000, 25000

CodePudding user response:

You have some problems you need to solve.

  1. You aren't guaranteed order on insert. I faced this issue trying to mimic your problem and had to add a column to ensure my data looked like yours.
    1. If your table really does have this order already you are likely ok.
  2. You need a column to order data on for a window to work. Here we will use monotonically_increasing_id, to help us get that column.
  3. You need to partition the data by primary account
    1. You can get around this by using a math trick using 0's for sub accounts so that if you are doing a rolling sum, they don't change the sum and this helps to partition the data.

.

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._

spark.sql("create table accounts ( Account int, Amount int, order int)") // I had to add order so my data would land in the format you had.
spark.sql("insert into accounts values ( 643100, 10000,1 ), (234100, 4000,2),(231300, 1000,3),(136400, 5000,4),(643841, 20000,5),(562100, 10000,6),(432176, 10000,7),(643304, 40000,8 ),(124562, 20000,9),(234567, 5000,10)")
spark.sql("select * from accounts order by order").withColumn("monotonically_increasing_id", monotonically_increasing_id()).show() // we need a column to order by a column for windows so this give us an ordered field to use. Link to documentation below.
val winowSpec = Window.partitionBy().orderBy("monotonically_increasing_id")
increasing
.withColumn("is_Primary", 
  when( col("Account") > 643000, col("Account") )
  .otherwise( 0 ) // This identifies primary & secondary accounts
).withColumn("partition", 
  Sum("is_Primary", 1, 0).over(winowSpec)  // rolling sum Trick that partitions data by nature of the fact 0's denote secondary.
).groupBy( 
  col("partition"), //groups primary and secondary
  col("is_Primary") //splits primary vs secondary totals.
).agg( sum("Amount") ).show()
 --------- ---------- ----------- 
|partition|is_Primary|sum(Amount)|
 --------- ---------- ----------- 
|   643100|    643100|      10000|
|   643100|         0|      10000|
|  1286941|    643841|      20000|
|  1286941|         0|      20000|
|  1930245|    643304|      40000|
|  1930245|         0|      25000|
 --------- ---------- ----------- 

Once you have this you can easily self join the table if you really want the rows as you described.

http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.monotonically_increasing_id

  • Related