Home > front end >  filter spark data frame by key & value of a dictionary
filter spark data frame by key & value of a dictionary

Time:10-24

I got a spark data frame (Scala) with many rows, which has a column which is a dictionary (a Json string) of the following format:

[{"ID1":111,"ID2":2,"value":"Z"},
{"ID1":222,"ID2":3,"value":"A"},
{"ID1":333,"ID2":4,"value":"Z"},
{"ID1":444,"ID2":5,"value":"B"},
{"ID1":555,"ID2":6,"value":"Z"},
{"ID1":666,"ID2":7,"value":"Z"},
{"ID1":777,"ID2":8,"value":"A"}]

I want to filter the dataframe, so it remains only with rows that contains a specific combination, for example ID1 = 111, ID2 = 2, value = Z. Note: not all rows may have all of the keys, for example, a row might not have the combination "ID1 = 111".

How can it be done efficiently in Scala spark? Thanks!

CodePudding user response:

you're dealing with an array of struct here which may be a bit complicated. Let's split the problem in two parts:firstly we will parse that column into an array of structs. You may use the function from_json. But in order to make it work you will need to provide in advance the schema. So here is a working example (this case covers the absence of some fields that you were mentioning).


    import spark.implicits._

val input = Seq[(String)](
("""[{"ID1":111,"ID2":2,"value":"Z"},
{"ID1":222,"ID2":3,"value":"A"},
{"ID1":333,"ID2":4,"value":"Z"},
{"ID1":444,"ID2":5,"value":"B"},
{"ID1":555,"ID2":6,"value":"Z"},
{"ID1":666,"ID2":7,"value":"Z"},
{"ID1":777,"ID2":8,"value":"A"}]"""
),
("""[{"ID2":7,"value":"Z"},
{"ID1":777,"ID2":8}]"""
),
).toDF("json")
  

    import org.apache.spark.sql.functions.{from_json, col}
    import org.apache.spark.sql.types.{ArrayType, StructType, StructField, IntegerType, StringType}

    val jsonSchema = new ArrayType(
      new StructType()
      .add("ID1", IntegerType)
      .add("ID2", IntegerType)
      .add("value", StringType)
      , false
    )

    val parsed = input.withColumn("parsed_json", from_json(col("json"), jsonSchema))

Now that we have this, we can take advantage of the exists function (is quite new, please check: https://mungingdata.com/spark-3/array-exists-forall-transform-aggregate-zip_with/), we're going to create a new column that will tell us if any of the structs matches the value we expected.

import org.apache.spark.sql.functions.exists
import org.apache.spark.sql.Column

def matches(id1: Integer, id2: Integer, value: String)(col: Column) = {
  col.getField("ID1") === id1 && col.getField("ID2") === id2 && col.getField("value") === value
}

display(parsed.withColumn("found", exists(col("parsed_json"), matches(111,2,"Z"))))

 ------------------------------------------------------------------------------------------- ----- 
|parsed_json                                                                                |found|
 ------------------------------------------------------------------------------------------- ----- 
|[{111, 2, Z}, {222, 3, A}, {333, 4, Z}, {444, 5, B}, {555, 6, Z}, {666, 7, Z}, {777, 8, A}]|true |
|[{null, 7, Z}, {777, 8, null}]                                                             |false|
 ------------------------------------------------------------------------------------------- ----- 

With this brand new column you can filter the dataframe and ensure that pattern. It may be a lot, but it is quite robust.

CodePudding user response:

There are several functions in Spark SQL which can help you to work with json values. The ones that can help with your request:

  • get_json_object: extracts json object from a json string based on json path specified, and returns json string of the extracted json object. It is helpful if you need to access specific portions of the json
  • from_json: converts a json string column into a struct column, given the schema of the json. Once converted you can access all the fields you need

Now, suppose you have a Dataset df with a json column containing your json string:

json
{ "ID1": 111, "ID2": 2, "value": "Z" }
{ "ID1": 222, "ID2": 3, "value": "A" }
{ "ID1": 333, "ID2": 4, "value": "Z" }
{ "ID1": 444, "ID2": 5, "value": "B" }
{ "ID1": 555, "ID2": 6, "value": "Z" }
{ "ID1": 666, "ID2": 7, "value": "Z" }
{ "ID1": 777, "ID2": 8, "value": "A" }

you can use get_json_object to filter the Dataset. You can also save the result as a new column and use it as you need:

import org.apache.spark.sql.{functions => F}

df
  .filter(F.get_json_object($"json", "$.ID1") === 111)
  .withColumn("value", F.get_json_object($"json", "$.value"))
  .filter($"value" === "Z")

or using from_json:

df
  .withColumn("json_as_struct", F.from_json($"json", "ID1 INT, ID2 INT, value STRING", Map[String, String]()))
  .filter($"json_as_struct.ID1" === 111 && $"json_as_struct.value" === "Z")
  • Related