I'm trying to build a recommender engine based on a web store that sells house products. I'll keep this simple for the example. Using scala and Spark.
I have a dataframe that contains 4 fields.
1. A unique ID (INT)
2. A ProductName (String)
3. A ProductPrice (Number)
4. ProductCategories (Json field)
I also have a second dataframe, which contains sales of these products. Anyway, I was looking at a movie recommender engine example, and it was fairly straightforward. Using the movie lens dataset. I was hoping to transpose that over to here to this products example. However, the ProductCategories, per product can look like the following:
e.g.
[{'id': 28, 'type': 'Home'}, {'id': 18, 'type': 'Kitchen'}, {'id': 53, 'type': 'Living'}]
[{'id': 28, 'type': 'Home'}, {'id': 23, 'type': 'Bathroom'}]
When I load the data into a dataframe, it's a json string.
Here is some dataframe code:
val ProductsDF2 = ProductsDF1_temp1.select(ProductsDF1_temp1.col("id"), ProductsDF1_temp1.col("product_name"), ProductsDF1_temp1.col("product_price"), ProductsDF1_temp1.col("product_categories"))
How do I manipulate the code, so the dataframe extracts the categories out of the product_categories (json) into their own columns like here:
| id | ProductName | ProductPrice | Home(Boolean) | Kitchen(Boolean) | Living(Boolean) | ... etc
I think this is what i need to achieve, to get the recommender engine working. Kinda like one hot encoding, I believe.
Any suggestions would be appreciated. I'm a little new to this.
Thanks Con
CodePudding user response:
Hope you find this useful
// Let us assume that the dataframe is defined as a variable df
// we need to parse the string json array data in column_name column which is available as a json string
df.show
// output of df
----------------------------------------------------
|column_name |
----------------------------------------------------
|[{"id":28,"type":"Home"},{"id":18,"type":"Kitchen"}]|
----------------------------------------------------
df.printSchema()
root
|-- column_name: string (nullable = true)
// Following is my answer
import org.apache.spark.sql.functions._
import org.json.JSONArray
// This udf converts the json array to an array of json string
val toArray = udf { (data: String) => {
val jsonArray = new JSONArray(data)
var arr: Array[String] = Array()
val objects = (0 until jsonArray.length).map(x => jsonArray.getJSONObject(x))
objects.foreach { elem =>
arr : = elem.toString
}
arr
}
}
val df1 = df.withColumn("column_name", toArray(col("column_name")))
df1.printSchema()
root
|-- column_name: array (nullable = true)
| |-- element: string (containsNull = true)
val df2 = df1.withColumn("column_name", explode(col("column_name")))
// here we are extracting the data from the json string using the schema of the json string data
val schema = spark.read.json(df1.select("column_name").rdd.map(x => x(0).toString)).schema
df2.withColumn("column_name", from_json(col("column_name"), schema))
.select(col("column_name.*"))
.show(false)
// Final output
--- -------
|id |type |
--- -------
|28 |Home |
|18 |Kitchen|
--- -------
Edit:
Please include the following maven dependency for importing org.json.JSONArray
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20201115</version>
</dependency>