I have a dataframe having a column of type MapType<StringType, StringType>
.
|-- identity: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
Identity column contains a key "update".
-------------
identity |
------- -----
[update -> Y]|
[update -> Y]|
[update -> Y]|
[update -> Y]|
------- -----
How do I change the value of key "update" from "Y" to "N"?
I'm using spark version 2.3
Any help will be appreciated. Thank you!
CodePudding user response:
AFAIK, in spark 2.3 there are no built in function to handle maps. The only way is probably to design a UDF:
val df = Seq(Map(1 -> 2, 3 -> 4), Map(7 -> 8, 1 -> 6)).toDF("m")
// a function that sets the value "new" to all key equal to "1"
val fun = udf((m : Map[String, String]) =>
m.map{ case (key, value) => (key, if (key == "1") "new" else value) }
)
df.withColumn("m", fun('m)).show(false)
------------------
|m |
------------------
|{1 -> new, 3 -> 4}|
|{7 -> 8, 1 -> new}|
------------------
JSON solution
One alternative is to explode the map, make the updates and re aggregate it. Unfortunately, there is no way in spark 2.3 to create a map from a dynamic number of items. You could however aggregate the map as a json dictionary and then use the from_json
function. I am pretty sure the first solution would be more efficient, but who knows. In pyspark, this solution might be faster than the UDF though.
df
.withColumn("id", monotonically_increasing_id)
.select($"id", explode('m))
.withColumn("value", when('key === "1" ,lit("new")).otherwise('value))
.withColumn("entry", concat(lit("\""), 'key, lit("\" : \""), 'value, lit("\"")))
.groupBy("id").agg( collect_list('entry) as "list")
.withColumn("json", concat(lit("{"), concat_ws(",", 'list), lit("}")))
.withColumn("m", from_json('json, MapType(StringType, StringType)))
.show(false)
Which yields the same result as before.