I have a dataframe with the following schema:
root
|-- id: long (nullable = true)
|-- type: string (nullable = true)
|-- tags: map (nullable = true)
| |-- key: string
| |-- value: string (valueContainsNull = true)
|-- lat: Long (nullable = true)
|-- lon: Long (nullable = true)
|-- nds: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- ref: long (nullable = true)
|-- members: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- type: string (nullable = true)
| | |-- ref: long (nullable = true)
| | |-- role: string (nullable = true)
I want to create a new dataframe res
where I select specific data from the column tags
. I need the values
from key=place
and key=population
. The new dataframe should have the following schema:
val schema = StructType(
Array(
StructField("place", StringType),
StructField("population", LongType)
)
)
I have litteraly no idea how I to do this. I tried to replicate the first dataframe and then select the columns, but that didnt work.
Anyone has a solution?
CodePudding user response:
Lets call your original dataframe df
. You can extract the information you want like this
import org.apache.spark.sql.functions.sql.col
val data = df
.select("tags")
.where(
df("tags")("key") isin (List("place", "population"): _*)
)
.select(
col("tags")("value")
)
.collect()
.toList
This will give you a List[Row]
which can be converted to another dataframe with your schema
import scala.collection.JavaConversions.seqAsJavaList
sparkSession.createDataFrame(seqAsJavaList[Row](data), schema)
CodePudding user response:
Given the following simplified input:
val df = Seq(
(1L, Map("place" -> "home", "population" -> "1", "name" -> "foo")),
(2L, Map("place" -> "home", "population" -> "4", "name" -> "foo")),
(3L, Map("population" -> "3")),
(4L, Map.empty[String, String])
).toDF("id", "tags")
You want to select the values using methods map_filter
to filter the map to only contain the key you want, then call map_values
to get those entries. map_values
returns an array, so you need to use explode_outer
to flatten the data. We use explode_outer
here because you might have entries which have neither place nor population, or only one of the two. Once the data is in a form we can easily work with, we just select the fields we want in the desired structure.
I've left the id
column in so when you run the example you can see that we don't drop entries with missing data.
val r = df.select(
col("id"),
explode_outer(map_values(map_filter(col("tags"), (k,_) => k === "place"))) as "place",
map_values(map_filter(col("tags"), (k,_) => k === "population")) as "population"
).withColumn("population", explode_outer(col("population")))
.select(
col("id"),
array(
struct(
col("place"),
col("population") cast LongType as "population"
) as "place_and_population"
) as "data"
)
Gives:
root
|-- id: long (nullable = false)
|-- data: array (nullable = false)
| |-- element: struct (containsNull = false)
| | |-- place: string (nullable = true)
| | |-- population: long (nullable = true)
--- --------------
| id| data|
--- --------------
| 1| [{home, 1}]|
| 2| [{home, 4}]|
| 3| [{null, 3}]|
| 4|[{null, null}]|
--- --------------