I have dataframe df
| name| languagesAtSchool|currentState|
---------------- ------------------ ------------
| James,,Smith|[Java, Scala, C ]| CA|
| Michael,Rose,|[Spark, Java, C ]| NJ|
|Robert,,Williams| [CSharp, VB, R]| NV|
---------------- ------------------ ------------
I want
---------------- -------- -----
|Name |language|State|
---------------- -------- -----
|James,,Smith |Java |CA |
|James,,Smith |Scala |CA |
|James,,Smith |C |CA |
|Michael,Rose, |Spark |NJ |
|Michael,Rose, |Java |NJ |
|Michael,Rose, |C |NJ |
|Robert,,Williams|CSharp |NV |
|Robert,,Williams|VB |NV |
|Robert,,Williams|R |NV |
---------------- -------- -----
I have tried the below which works perfectly
val df2=df.flatMap(f=> f.getSeq[String](1).map((f.getString(0),_,f.getString(2))))
.toDF("Name","language","State")
but I want something works without specifing other columns to keep, thus I tried
val df2 = df.withColumn("laguage", df.flatMap(f=>f.getSeq[String](1)))
Then it gives
Unknown Error: <console>:40: error: missing parameter type
val df3 = df.withColumn("laguage", df.flatMap(f=>f.getSeq[String](1)))
^
Therefore I want something in Spark to transform a column without discarding others. I guess the reason is that scala is unable to determine the type, but I cannot fix it. I'm new to scala and thanks for your help!
CodePudding user response:
The method you're looking for is explode
:
def explode(e: Column): Column Creates a new row for each element in the given array or map column. Uses the default column name col for elements in the array and key and value for elements in the map unless specified otherwise. Since 1.3.0
df.withColumn("language", explode(col("language"))
CodePudding user response:
explode
is exactly for this case - it splits an array column so each element from the list will be in a separate row.
Here is a full example with outputs:
package org.example
import org.apache.spark.sql._
import org.apache.spark.sql.functions.{col, explode}
import org.apache.spark.sql.types.{ArrayType, StringType, StructType}
object App {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession.builder().appName("test").master("local[*]").getOrCreate()
import spark.implicits._
// create dataframe with test data
val data = Seq(
Row("James,,Smith", List("java", "scala"), "ca"),
Row("Robert,,Williams", List("c", "c "), "nv")
)
val schema = new StructType()
.add("name", StringType)
.add("languages", ArrayType(StringType))
.add("current_state", StringType)
val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema)
df.show(false)
// ---------------- ------------- -------------
// |name |languages |current_state|
// ---------------- ------------- -------------
// |James,,Smith |[java, scala]|ca |
// |Robert,,Williams|[c, c ] |nv |
// ---------------- ------------- -------------
// use explode to split the array values into different rows
df.withColumn("language", explode(col("languages"))).drop("languages").show()
// ---------------- ------------- --------
// | name|current_state|language|
// ---------------- ------------- --------
// | James,,Smith| ca| java|
// | James,,Smith| ca| scala|
// |Robert,,Williams| nv| c|
// |Robert,,Williams| nv| c |
// ---------------- ------------- --------
}
}