Home > database >  Create new dataframe from selected information from another datama
Create new dataframe from selected information from another datama

Time:10-17

I have a dataframe with the following schema:

root
 |-- id: long (nullable = true)
 |-- type: string (nullable = true)
 |-- tags: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- lat: Long (nullable = true)
 |-- lon: Long (nullable = true)
 |-- nds: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- ref: long (nullable = true)
 |-- members: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |    |-- ref: long (nullable = true)
 |    |    |-- role: string (nullable = true)

I want to create a new dataframe res where I select specific data from the column tags. I need the values from key=place and key=population. The new dataframe should have the following schema:

val schema = StructType(
               Array(
                 StructField("place", StringType),
                 StructField("population", LongType)
               )
             )

I have litteraly no idea how I to do this. I tried to replicate the first dataframe and then select the columns, but that didnt work.

Anyone has a solution?

CodePudding user response:

Lets call your original dataframe df. You can extract the information you want like this

import org.apache.spark.sql.functions.sql.col

val data = df
  .select("tags")
  .where(
    df("tags")("key") isin (List("place", "population"): _*)
  )
  .select(
    col("tags")("value")
  )
  .collect()
  .toList

This will give you a List[Row] which can be converted to another dataframe with your schema

import scala.collection.JavaConversions.seqAsJavaList

sparkSession.createDataFrame(seqAsJavaList[Row](data), schema)

CodePudding user response:

Given the following simplified input:

val df = Seq(
  (1L, Map("place" -> "home", "population" -> "1", "name" -> "foo")),
  (2L, Map("place" -> "home", "population" -> "4", "name" -> "foo")),
  (3L, Map("population" -> "3")),
  (4L, Map.empty[String, String])
).toDF("id", "tags")

You want to select the values using methods map_filter to filter the map to only contain the key you want, then call map_values to get those entries. map_values returns an array, so you need to use explode_outer to flatten the data. We use explode_outer here because you might have entries which have neither place nor population, or only one of the two. Once the data is in a form we can easily work with, we just select the fields we want in the desired structure.

I've left the id column in so when you run the example you can see that we don't drop entries with missing data.


val r = df.select(
    col("id"),
    explode_outer(map_values(map_filter(col("tags"), (k,_) => k === "place"))) as "place",
    map_values(map_filter(col("tags"), (k,_) => k === "population")) as "population"
  ).withColumn("population", explode_outer(col("population")))
  .select(
    col("id"),
    array(
      struct(
        col("place"),
        col("population") cast LongType as "population"
      ) as "place_and_population"
    ) as "data"
  )

Gives:

root
 |-- id: long (nullable = false)
 |-- data: array (nullable = false)
 |    |-- element: struct (containsNull = false)
 |    |    |-- place: string (nullable = true)
 |    |    |-- population: long (nullable = true)

 --- -------------- 
| id|          data|
 --- -------------- 
|  1|   [{home, 1}]|
|  2|   [{home, 4}]|
|  3|   [{null, 3}]|
|  4|[{null, null}]|
 --- -------------- 
  • Related