I have an aggregation problem using Mongodb with Spark. I'm not very expert about this and i don't even know if exists the operation that i need.
I have several record, already aggregated by username. Then there is this special username called "-".
username1 data1:100
username2 data1:100
username3 data1:100
username4 data1:100
- data1:55
Now, i need to sum data1 from username "-" to all other data1.
username1 data1:155
username2 data1:155
username3 data1:155
username4 data1:155
- data1:55
How can i make this using mongodb spark?
Actually i have
rawDataRows.///some stuff//.groupBy("username")
this produces the output i wrote, someone can help me "merge" the data from username "-" to all other users?
CodePudding user response:
Leaving MongoDB read aside (JSON document - I have no such source to work with where I am now, see https://docs.mongodb.com/spark-connector/current/python/read-from-mongodb/), then:
- If you have already aggregated data and there is just 1 '-' record,
- all you have to do is to filter this single record to another dataframe, DF1,
- then filter the original data <> '-' to, say, DF2,
- then JOIN and add the values,
- select columns needed.
- then JOIN and add the values,
- then filter the original data <> '-' to, say, DF2,
- all you have to do is to filter this single record to another dataframe, DF1,
No GROUP BY needed.
Like this:
// Simple data gen: JSON - need to read your MongoDB doc in yourself
import scala.collection.mutable.ListBuffer
val json_content1 = "{'username': 'hello', 'data1': 32}"
val json_content2 = "{'username': 'hello2', 'data1': 64}"
val json_content3 = "{'username': '-', 'data1': 100}"
var json_seq = new ListBuffer[String]()
json_seq = json_content1
json_seq = json_content2
json_seq = json_content3
// JSON in
val json_ds = json_seq.toDS()
json_ds.show(false)
// Make a DF
val df= spark.read.json(json_ds).cache()
df.show(false)
// Get the 2 sets of data
val df1 = df.filter($"username" === "-" ).toDF("dataNull","usernameNull")
val df2 = df.filter($"username" =!= "-" )
df1.show(false)
df2.show(false)
// Add together and Bob's your uncle
val res = df1.join(df2).withColumn("data1", 'data1 'dataNull).drop('dataNull).drop('usernameNull)
res.show(false)
returns:
----- --------
|data1|username|
----- --------
|132 |hello |
|164 |hello2 |
----- --------
for my sample data in.