I have a table like below:
id | product_id | agent | order_date |
---|---|---|---|
1 | 1 | 1 | 2021-09-13 |
1 | 2 | 1 | 2021-09-13 |
1 | 3 | 1 | 2021-09-13 |
2 | 1 | 1 | 2021-09-13 |
2 | 2 | 1 | 2021-09-13 |
How can I add a new column with all columns concatenated in json with custom keys like below, the key mappings in json are:
- id -> internal_id
- product_id -> item_id
- agent -> associate_id
- order_date -> transaction_date
id | product_id | agent | order_date | wanted_column |
---|---|---|---|---|
1 | 1 | 1 | 2021-09-13 | {internal_id: 1, item_id: 1, associate_id: 1, transaction_date: 2021-09-13} |
1 | 2 | 1 | 2021-09-13 | {internal_id: 1, item_id: 2, associate_id: 1, transaction_date: 2021-09-13} |
1 | 3 | 1 | 2021-09-13 | {internal_id: 1, item_id: 3, associate_id: 1, transaction_date: 2021-09-13} |
2 | 1 | 1 | 2021-09-13 | {internal_id: 2, item_id: 1, associate_id: 1, transaction_date: 2021-09-13} |
2 | 2 | 1 | 2021-09-13 | {internal_id: 2, item_id: 2, associate_id: 1, transaction_date: 2021-09-13} |
Eventually the new column will be selected and dumped as json for further. I've been stuck with this for a while. Tried concat and map_from_entries but none of them worked.
I'm using pyspark api
CodePudding user response:
Use the below way, the important functions to be used are the to_json
and `alias'. I have written it in scala, I am sure can be converted it to python.
import spark.implicits._
//just to create the dataset for the example you have given
val data = Seq(
("1", "1", "1", "2021-09-13"),
("1", "2", "1", "2021-09-13"),
("1", "3", "1", "2021-09-13"),
("2", "1", "1", "2021-09-13"),
("2", "2", "1", "2021-09-13"))
val dataset = data.toDF("id", "product_id", "agent", "order_date")
//create the key Mapping programatically by looping if its not static
val keyMapping:Map[String,String] = Map("id" -> "internal_id","product_id" -> "item_id","agent" -> "associate_id","order_date" -> "transaction_date")
val columns = keyMapping.map(f => {
new Column(f._1).alias(f._2)
}).toSeq
dataset.withColumn("wanted_column", to_json(struct(columns:_*))).show(false)
//output
--- ---------- ----- ---------- ------------------------------------------------------------------------------------
|id |product_id|agent|order_date|json_data |
--- ---------- ----- ---------- ------------------------------------------------------------------------------------
|1 |1 |1 |2021-09-13|{"internal_id":"1","item_id":"1","associate_id":"1","transaction_date":"2021-09-13"}|
|1 |2 |1 |2021-09-13|{"internal_id":"1","item_id":"2","associate_id":"1","transaction_date":"2021-09-13"}|
|1 |3 |1 |2021-09-13|{"internal_id":"1","item_id":"3","associate_id":"1","transaction_date":"2021-09-13"}|
|2 |1 |1 |2021-09-13|{"internal_id":"2","item_id":"1","associate_id":"1","transaction_date":"2021-09-13"}|
|2 |2 |1 |2021-09-13|{"internal_id":"2","item_id":"2","associate_id":"1","transaction_date":"2021-09-13"}|
--- ---------- ----- ---------- ------------------------------------------------------------------------------------
CodePudding user response:
possibly this question is answered here How to save a dataframe into a json file with multiline option in pyspark
I use scala so here is what i would do
// mock up data, you dont need this if you already have a dataframe ready
val data = Seq(
("1", "1", "1", "2021-09-13"),
("1", "2", "1", "2021-09-13"),
("1", "3", "1", "2021-09-13"),
("2", "1", "1", "2021-09-13"),
("2", "2", "1", "2021-09-13")
)
import spark.implicits._
val df = data.toDF("id", "product_id", "agent", "order_date")
// mocking data is finished here, you now have a dataframe ready to work with
// changing column names to desired names
val new_df = df
.withColumnRenamed("id", "internal_id")
.withColumnRenamed("product_id", "item_id")
.withColumnRenamed("agent", "associate_id")
.withColumnRenamed("order_date", "transaction_date")
// writing json
new_df.write.json("vikas.json")