Home > Software design >  Concatenate all columns and dump as json in Spark
Concatenate all columns and dump as json in Spark

Time:09-17

I have a table like below:

id product_id agent order_date
1 1 1 2021-09-13
1 2 1 2021-09-13
1 3 1 2021-09-13
2 1 1 2021-09-13
2 2 1 2021-09-13

How can I add a new column with all columns concatenated in json with custom keys like below, the key mappings in json are:

  • id -> internal_id
  • product_id -> item_id
  • agent -> associate_id
  • order_date -> transaction_date
id product_id agent order_date wanted_column
1 1 1 2021-09-13 {internal_id: 1, item_id: 1, associate_id: 1, transaction_date: 2021-09-13}
1 2 1 2021-09-13 {internal_id: 1, item_id: 2, associate_id: 1, transaction_date: 2021-09-13}
1 3 1 2021-09-13 {internal_id: 1, item_id: 3, associate_id: 1, transaction_date: 2021-09-13}
2 1 1 2021-09-13 {internal_id: 2, item_id: 1, associate_id: 1, transaction_date: 2021-09-13}
2 2 1 2021-09-13 {internal_id: 2, item_id: 2, associate_id: 1, transaction_date: 2021-09-13}

Eventually the new column will be selected and dumped as json for further. I've been stuck with this for a while. Tried concat and map_from_entries but none of them worked.

I'm using pyspark api

CodePudding user response:

Use the below way, the important functions to be used are the to_json and `alias'. I have written it in scala, I am sure can be converted it to python.

import spark.implicits._

//just to create the dataset for the example you have given
val data = Seq(
  ("1", "1", "1", "2021-09-13"),
  ("1", "2", "1", "2021-09-13"),
  ("1", "3", "1", "2021-09-13"),
  ("2", "1", "1", "2021-09-13"),
  ("2", "2", "1", "2021-09-13"))
  
val dataset = data.toDF("id", "product_id", "agent", "order_date")

//create the key Mapping programatically by looping if its not static
val keyMapping:Map[String,String] = Map("id" -> "internal_id","product_id" -> "item_id","agent" -> "associate_id","order_date" -> "transaction_date")
val columns = keyMapping.map(f => {
          new Column(f._1).alias(f._2)
        }).toSeq

dataset.withColumn("wanted_column", to_json(struct(columns:_*))).show(false)

//output 
 --- ---------- ----- ---------- ------------------------------------------------------------------------------------ 
|id |product_id|agent|order_date|json_data                                                                           |
 --- ---------- ----- ---------- ------------------------------------------------------------------------------------ 
|1  |1         |1    |2021-09-13|{"internal_id":"1","item_id":"1","associate_id":"1","transaction_date":"2021-09-13"}|
|1  |2         |1    |2021-09-13|{"internal_id":"1","item_id":"2","associate_id":"1","transaction_date":"2021-09-13"}|
|1  |3         |1    |2021-09-13|{"internal_id":"1","item_id":"3","associate_id":"1","transaction_date":"2021-09-13"}|
|2  |1         |1    |2021-09-13|{"internal_id":"2","item_id":"1","associate_id":"1","transaction_date":"2021-09-13"}|
|2  |2         |1    |2021-09-13|{"internal_id":"2","item_id":"2","associate_id":"1","transaction_date":"2021-09-13"}|
 --- ---------- ----- ---------- ------------------------------------------------------------------------------------ 

CodePudding user response:

possibly this question is answered here How to save a dataframe into a json file with multiline option in pyspark

I use scala so here is what i would do

// mock up data, you dont need this if you already have a dataframe ready
val data = Seq(
("1", "1", "1", "2021-09-13"),
("1", "2", "1", "2021-09-13"),
("1", "3", "1", "2021-09-13"),
("2", "1", "1", "2021-09-13"),
("2", "2", "1", "2021-09-13")
)

import spark.implicits._
val df = data.toDF("id", "product_id", "agent", "order_date")

// mocking data is finished here, you now have a dataframe ready to work with

// changing column names to desired names
val new_df = df
.withColumnRenamed("id", "internal_id")
.withColumnRenamed("product_id", "item_id")
.withColumnRenamed("agent", "associate_id")
.withColumnRenamed("order_date", "transaction_date")

// writing json
new_df.write.json("vikas.json")

  • Related