Home > Back-end >  Mongodb aggregation using Spark
Mongodb aggregation using Spark

Time:10-14

I have an aggregation problem using Mongodb with Spark. I'm not very expert about this and i don't even know if exists the operation that i need.

I have several record, already aggregated by username. Then there is this special username called "-".

username1 data1:100
username2 data1:100
username3 data1:100
username4 data1:100
-         data1:55

Now, i need to sum data1 from username "-" to all other data1.

username1 data1:155
username2 data1:155
username3 data1:155
username4 data1:155
-         data1:55

How can i make this using mongodb spark?

Actually i have

rawDataRows.///some stuff//.groupBy("username")

this produces the output i wrote, someone can help me "merge" the data from username "-" to all other users?

CodePudding user response:

Leaving MongoDB read aside (JSON document - I have no such source to work with where I am now, see https://docs.mongodb.com/spark-connector/current/python/read-from-mongodb/), then:

  • If you have already aggregated data and there is just 1 '-' record,
    • all you have to do is to filter this single record to another dataframe, DF1,
      • then filter the original data <> '-' to, say, DF2,
        • then JOIN and add the values,
          • select columns needed.

No GROUP BY needed.

Like this:

// Simple data gen: JSON - need to read your MongoDB doc in yourself
import scala.collection.mutable.ListBuffer
val json_content1 = "{'username': 'hello', 'data1': 32}"
val json_content2 = "{'username': 'hello2', 'data1': 64}"
val json_content3 = "{'username': '-', 'data1': 100}"


var json_seq = new ListBuffer[String]()
json_seq  = json_content1
json_seq  = json_content2
json_seq  = json_content3

// JSON in
val json_ds = json_seq.toDS()
json_ds.show(false)

// Make a DF
val df= spark.read.json(json_ds).cache()
df.show(false)

// Get the 2 sets of data
val df1 = df.filter($"username" === "-" ).toDF("dataNull","usernameNull")
val df2 = df.filter($"username" =!= "-" )

df1.show(false)
df2.show(false)

// Add together and Bob's your uncle
val res = df1.join(df2).withColumn("data1", 'data1   'dataNull).drop('dataNull).drop('usernameNull)
res.show(false)

returns:

 ----- -------- 
|data1|username|
 ----- -------- 
|132  |hello   |
|164  |hello2  |
 ----- -------- 

for my sample data in.

  • Related