We have a dataFrame that looks like:
root
|-- id: string (nullable = true)
|-- key1_suffix1: string (nullable = true)
|-- key2_suffix1: string (nullable = true)
|-- suffix1: string (nullable = true)
|-- key1_suffix2: string (nullable = true)
|-- key2_suffix2: string (nullable = true)
|-- suffix2: string (nullable = true)
How can we convert this into another dataframe like this:
root
|-- id: string (nullable = true)
|-- tags: struct (nullable = true)
| |-- suffix1: struct (nullable = true)
| | |-- key1_suffix1: string (nullable = true)
| | |-- key2_suffix1: string (nullable = true)
| | |-- suffix1: string (nullable = true)
| |-- suffix2: struct (nullable = true)
| | |-- key1_suffix2: string (nullable = true)
| | |-- key2_suffix2: string (nullable = true)
| | |-- suffix2: string (nullable = true)
Input array with suffixes will be already given. example inputSuffix=["suffix1","suffix2"]
This is needed in spark scala code. Spark=3.1 and scala = 2.12
CodePudding user response:
You can use struct() function to group columns into 1 nested columns:
// test data
import spark.implicits._
val df = Seq(
("1", "a", "b", "c", "d", "e", "f"),
("2", "aa", "bb", "cc", "dd", "ee", "ff")
).toDF("id", "key1_suffix1", "key2_suffix1", "suffix1", "key1_suffix2", "key2_suffix2", "suffix2")
// Processing
val res = df.withColumn("tags", struct(struct("key1_suffix1", "key2_suffix1", "suffix1").as("suffix1"),
struct("key1_suffix2", "key2_suffix2", "suffix2").as("suffix2")))
.drop("key1_suffix1", "key2_suffix1", "suffix1", "key1_suffix2", "key2_suffix2", "suffix2")
res.printSchema()
root
|-- id: string (nullable = true)
|-- tags: struct (nullable = false)
| |-- suffix1: struct (nullable = false)
| | |-- key1_suffix1: string (nullable = true)
| | |-- key2_suffix1: string (nullable = true)
| | |-- suffix1: string (nullable = true)
| |-- suffix2: struct (nullable = false)
| | |-- key1_suffix2: string (nullable = true)
| | |-- key2_suffix2: string (nullable = true)
| | |-- suffix2: string (nullable = true)
UPDATE
This can be done dynamically using a list of columns, if you value in the list that doesn't exist in the dataframe you can remove them to make sure you will not get some errors:
val inputSuffix = Array("suffix1", "suffix2", "suffix3")
val inputSuffixFiltred = inputSuffix.filter(c => df.columns.contains(s"key1_$c") && df.columns.contains(s"key2_$c") && df.columns.contains(c))
val tagsCol = inputSuffixFiltred.map(c => struct(s"key1_$c", s"key2_$c", c).as(c))
val colsToDelete = inputSuffixFiltred.flatMap(c => Seq(s"key1_$c", s"key2_$c", c))
val res = df.withColumn("tags", struct(tagsCol: _*)).drop(colsToDelete: _*)
res.printSchema()