Home > Software design >  Json in a Dataframe field in Scala in a recommender engine
Json in a Dataframe field in Scala in a recommender engine

Time:02-28

I'm trying to build a recommender engine based on a web store that sells house products. I'll keep this simple for the example. Using scala and Spark.

I have a dataframe that contains 4 fields.

1. A unique ID   (INT)
2. A ProductName  (String)
3. A ProductPrice  (Number)
4. ProductCategories (Json field)

I also have a second dataframe, which contains sales of these products. Anyway, I was looking at a movie recommender engine example, and it was fairly straightforward. Using the movie lens dataset. I was hoping to transpose that over to here to this products example. However, the ProductCategories, per product can look like the following:

e.g.

[{'id': 28, 'type': 'Home'}, {'id': 18, 'type': 'Kitchen'}, {'id': 53, 'type': 'Living'}]
[{'id': 28, 'type': 'Home'}, {'id': 23, 'type': 'Bathroom'}]

When I load the data into a dataframe, it's a json string.

Here is some dataframe code:

val ProductsDF2 = ProductsDF1_temp1.select(ProductsDF1_temp1.col("id"), ProductsDF1_temp1.col("product_name"), ProductsDF1_temp1.col("product_price"), ProductsDF1_temp1.col("product_categories"))

How do I manipulate the code, so the dataframe extracts the categories out of the product_categories (json) into their own columns like here:

| id | ProductName | ProductPrice | Home(Boolean) | Kitchen(Boolean) | Living(Boolean) | ... etc

I think this is what i need to achieve, to get the recommender engine working. Kinda like one hot encoding, I believe.

Any suggestions would be appreciated. I'm a little new to this.

Thanks Con

CodePudding user response:

Hope you find this useful

// Let us assume that the dataframe is defined as a variable df
// we need to parse the string json array data in column_name column which is available as a json string

df.show

// output of df

 ---------------------------------------------------- 
|column_name                                         |
 ---------------------------------------------------- 
|[{"id":28,"type":"Home"},{"id":18,"type":"Kitchen"}]|
 ---------------------------------------------------- 

df.printSchema()

root
 |-- column_name: string (nullable = true)

// Following is my answer

import org.apache.spark.sql.functions._
import org.json.JSONArray

// This udf converts the json array to an array of json string

val toArray = udf { (data: String) => {
    val jsonArray = new JSONArray(data)
    var arr: Array[String] = Array()
    val objects = (0 until jsonArray.length).map(x => jsonArray.getJSONObject(x))
    objects.foreach { elem =>
      arr : = elem.toString
    }
    arr
  }
}

val df1 = df.withColumn("column_name", toArray(col("column_name")))

df1.printSchema()

root
 |-- column_name: array (nullable = true)
 |    |-- element: string (containsNull = true)
   
val df2 = df1.withColumn("column_name", explode(col("column_name")))

// here we are extracting the data from the json string using the schema of the json string data

val schema = spark.read.json(df1.select("column_name").rdd.map(x => x(0).toString)).schema

df2.withColumn("column_name", from_json(col("column_name"), schema))
   .select(col("column_name.*"))
   .show(false)
   
// Final output

 --- ------- 
|id |type   |
 --- ------- 
|28 |Home   |
|18 |Kitchen|
 --- ------- 

Edit: Please include the following maven dependency for importing org.json.JSONArray

<dependency>
    <groupId>org.json</groupId>
    <artifactId>json</artifactId>
    <version>20201115</version>
</dependency>
  • Related