I got a spark data frame (Scala) with many rows, which has a column which is a dictionary (a Json string) of the following format:
[{"ID1":111,"ID2":2,"value":"Z"},
{"ID1":222,"ID2":3,"value":"A"},
{"ID1":333,"ID2":4,"value":"Z"},
{"ID1":444,"ID2":5,"value":"B"},
{"ID1":555,"ID2":6,"value":"Z"},
{"ID1":666,"ID2":7,"value":"Z"},
{"ID1":777,"ID2":8,"value":"A"}]
I want to filter the dataframe, so it remains only with rows that contains a specific combination, for example ID1 = 111, ID2 = 2, value = Z. Note: not all rows may have all of the keys, for example, a row might not have the combination "ID1 = 111".
How can it be done efficiently in Scala spark? Thanks!
CodePudding user response:
you're dealing with an array of struct here which may be a bit complicated. Let's split the problem in two parts:firstly we will parse that column into an array of structs. You may use the function from_json. But in order to make it work you will need to provide in advance the schema. So here is a working example (this case covers the absence of some fields that you were mentioning).
import spark.implicits._
val input = Seq[(String)](
("""[{"ID1":111,"ID2":2,"value":"Z"},
{"ID1":222,"ID2":3,"value":"A"},
{"ID1":333,"ID2":4,"value":"Z"},
{"ID1":444,"ID2":5,"value":"B"},
{"ID1":555,"ID2":6,"value":"Z"},
{"ID1":666,"ID2":7,"value":"Z"},
{"ID1":777,"ID2":8,"value":"A"}]"""
),
("""[{"ID2":7,"value":"Z"},
{"ID1":777,"ID2":8}]"""
),
).toDF("json")
import org.apache.spark.sql.functions.{from_json, col}
import org.apache.spark.sql.types.{ArrayType, StructType, StructField, IntegerType, StringType}
val jsonSchema = new ArrayType(
new StructType()
.add("ID1", IntegerType)
.add("ID2", IntegerType)
.add("value", StringType)
, false
)
val parsed = input.withColumn("parsed_json", from_json(col("json"), jsonSchema))
Now that we have this, we can take advantage of the exists function (is quite new, please check: https://mungingdata.com/spark-3/array-exists-forall-transform-aggregate-zip_with/), we're going to create a new column that will tell us if any of the structs matches the value we expected.
import org.apache.spark.sql.functions.exists
import org.apache.spark.sql.Column
def matches(id1: Integer, id2: Integer, value: String)(col: Column) = {
col.getField("ID1") === id1 && col.getField("ID2") === id2 && col.getField("value") === value
}
display(parsed.withColumn("found", exists(col("parsed_json"), matches(111,2,"Z"))))
------------------------------------------------------------------------------------------- -----
|parsed_json |found|
------------------------------------------------------------------------------------------- -----
|[{111, 2, Z}, {222, 3, A}, {333, 4, Z}, {444, 5, B}, {555, 6, Z}, {666, 7, Z}, {777, 8, A}]|true |
|[{null, 7, Z}, {777, 8, null}] |false|
------------------------------------------------------------------------------------------- -----
With this brand new column you can filter the dataframe and ensure that pattern. It may be a lot, but it is quite robust.
CodePudding user response:
There are several functions in Spark SQL which can help you to work with json values. The ones that can help with your request:
get_json_object
: extracts json object from a json string based on json path specified, and returns json string of the extracted json object. It is helpful if you need to access specific portions of the jsonfrom_json
: converts a json string column into a struct column, given the schema of the json. Once converted you can access all the fields you need
Now, suppose you have a Dataset df
with a json
column containing your json string:
json |
---|
{ "ID1": 111, "ID2": 2, "value": "Z" } |
{ "ID1": 222, "ID2": 3, "value": "A" } |
{ "ID1": 333, "ID2": 4, "value": "Z" } |
{ "ID1": 444, "ID2": 5, "value": "B" } |
{ "ID1": 555, "ID2": 6, "value": "Z" } |
{ "ID1": 666, "ID2": 7, "value": "Z" } |
{ "ID1": 777, "ID2": 8, "value": "A" } |
you can use get_json_object
to filter the Dataset. You can also save the result as a new column and use it as you need:
import org.apache.spark.sql.{functions => F}
df
.filter(F.get_json_object($"json", "$.ID1") === 111)
.withColumn("value", F.get_json_object($"json", "$.value"))
.filter($"value" === "Z")
or using from_json
:
df
.withColumn("json_as_struct", F.from_json($"json", "ID1 INT, ID2 INT, value STRING", Map[String, String]()))
.filter($"json_as_struct.ID1" === 111 && $"json_as_struct.value" === "Z")