Home > Blockchain >  group different columns together into list of struct struct based on suffix list
group different columns together into list of struct struct based on suffix list

Time:01-14

We have a dataFrame that looks like:

root
 |-- id: string (nullable = true)
 |-- key1_suffix1: string (nullable = true)
 |-- key2_suffix1: string (nullable = true)
 |-- suffix1: string (nullable = true)
 |-- key1_suffix2: string (nullable = true)
 |-- key2_suffix2: string (nullable = true)
 |-- suffix2: string (nullable = true)

How can we convert this into another dataframe like this:

root
 |-- id: string (nullable = true)
 |-- tags: struct (nullable = true)
 |     |-- suffix1: struct (nullable = true)
 |     |     |-- key1_suffix1: string (nullable = true)
 |     |     |-- key2_suffix1: string (nullable = true)
 |     |     |-- suffix1: string (nullable = true)
 |     |-- suffix2: struct (nullable = true)
 |     |     |-- key1_suffix2: string (nullable = true)
 |     |     |-- key2_suffix2: string (nullable = true)
 |     |     |-- suffix2: string (nullable = true)

Input array with suffixes will be already given. example inputSuffix=["suffix1","suffix2"]

This is needed in spark scala code. Spark=3.1 and scala = 2.12

CodePudding user response:

You can use struct() function to group columns into 1 nested columns:

// test data
import spark.implicits._
val df = Seq(
  ("1", "a", "b", "c", "d", "e", "f"),
  ("2", "aa", "bb", "cc", "dd", "ee", "ff")
).toDF("id", "key1_suffix1", "key2_suffix1", "suffix1", "key1_suffix2", "key2_suffix2", "suffix2")

// Processing
val res = df.withColumn("tags", struct(struct("key1_suffix1", "key2_suffix1", "suffix1").as("suffix1"),
  struct("key1_suffix2", "key2_suffix2", "suffix2").as("suffix2")))
  .drop("key1_suffix1", "key2_suffix1", "suffix1", "key1_suffix2", "key2_suffix2", "suffix2")

res.printSchema()


root
 |-- id: string (nullable = true)
 |-- tags: struct (nullable = false)
 |    |-- suffix1: struct (nullable = false)
 |    |    |-- key1_suffix1: string (nullable = true)
 |    |    |-- key2_suffix1: string (nullable = true)
 |    |    |-- suffix1: string (nullable = true)
 |    |-- suffix2: struct (nullable = false)
 |    |    |-- key1_suffix2: string (nullable = true)
 |    |    |-- key2_suffix2: string (nullable = true)
 |    |    |-- suffix2: string (nullable = true)

UPDATE

This can be done dynamically using a list of columns, if you value in the list that doesn't exist in the dataframe you can remove them to make sure you will not get some errors:

val inputSuffix = Array("suffix1", "suffix2", "suffix3")
val inputSuffixFiltred = inputSuffix.filter(c => df.columns.contains(s"key1_$c") && df.columns.contains(s"key2_$c") && df.columns.contains(c))
val tagsCol = inputSuffixFiltred.map(c => struct(s"key1_$c", s"key2_$c", c).as(c))
val colsToDelete = inputSuffixFiltred.flatMap(c => Seq(s"key1_$c", s"key2_$c", c))
val res = df.withColumn("tags", struct(tagsCol: _*)).drop(colsToDelete: _*)
res.printSchema()
  • Related