Home > database >  Create a new dataframe (with different schema) from selected information from another dataframe
Create a new dataframe (with different schema) from selected information from another dataframe

Time:10-19

I have a dataframe where the tag column contains different key->values. I try to filter out the values information where the key=name. The filtered out information should be put in a new dataframe.

The initial df has the following schema:

root
 |-- id: long (nullable = true)
 |-- type: string (nullable = true)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- nds: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ref: long (nullable = true)
 |-- members: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- ref: long (nullable = true)
 |    |    |-- role: string (nullable = true)
 |-- visible: boolean (nullable = true)

And I want a newdf of schema:

root
 |-- place: string (nullable = true)
 |-- num_evacuees string (nullable = true)

How should I do the filter? I tried a lot of methods, where I tried to have a normal filter at least. But everytime, the result of the filter is an empty dataframe each time. For example:

val newdf = df.filter($"tags"("key") contains "name")
val newdf = df.where(places("tags")("key") === "name")

I tried a lot more methods, but none of it has worked How should I do the proper filter

CodePudding user response:

You can achieve the result you want with:

         val df = Seq(
                 (1L, Map("sf" -> "100")),
                 (2L, Map("ny" -> "200"))
               ).toDF("id", "tags")
               
               val resultDf = df
                 .select(explode(map_filter(col("tags"), (k, _) => k === "ny")))
                 .withColumnRenamed("key", "place")
                 .withColumnRenamed("value", "num_evacuees")
               
               resultDf.printSchema
               resultDf.show

Which will show:

root
 |-- place: string (nullable = false)
 |-- num_evacuees: string (nullable = true)

 ----- ------------ 
|place|num_evacuees|
 ----- ------------ 
|   ny|         200|
 ----- ------------ 

The key idea is to use map_filter to select the fields from the map you want then explode turns the map into two columns (key and value) which you can then rename to make the DataFrame match your specification.

The above example assumes you want to get a single value to demonstrate the idea. The lambda function used by map_filter can be as complex as necessary. Its signature map_filter(expr: Column, f: (Column, Column) => Column): Column shows that as long as you return a Column it will be happy.

If you wanted to filter a large number of entries you could do something like:

val resultDf = df
  .withColumn("filterList", array("sf", "place_n"))
  .select(explode(map_filter(col("tags"), (k, _) => array_contains(col("filterList"), k))))

CodePudding user response:

The idea is to extract the keys of the map column (tags), then use array_contains to check for a key called "name".

import org.apache.spark.sql.functions._
val newdf = df.filter(array_contains(map_keys($"tags), "name"))
  • Related