Home > Blockchain >  Spark Scala split column values in a dataframe to appended lists
Spark Scala split column values in a dataframe to appended lists

Time:12-28

I have data in a spark dataframe that I need to search for elements by name, append the values to a list, and split searched elements into separate columns of the dataframe.

I am using scala and the below is an example of my current code that works to get the first value but I need to append all values available not just the first.

I'm new to Scala (and python) so any help will be greatly appreciated!

val getNumber: (String => String) = (colString: String) => {
  if (colString != null) {
    raw"number:(\d )".r
      .findAllIn(colString)
      .group(1)
  }
  else
    null
}

val udfGetColumn = udf(getNumber)

val mydf = df.select(cols.....)
.withColumn("var_number", udfGetColumn($"var"))

Example Data:

 ------ ---------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|   key|           var                                                                                                                                                  |
 ------ ---------------------------------------------------------------------------------------------------------------------------------------------------------------- 
|1     |["[number:123456 rate:111970 position:1]","[number:123457 rate:662352 position:2]","[number:123458 rate:890 position:3]","[number:123459 rate:190 position:4]"] |                                                                                    |
|2     |["[number:654321 rate:211971 position:1]","[number:654322 rate:124 position:2]","[number:654323 rate:421 position:3]"]                                          |
 ------ ---------------------------------------------------------------------------------------------------------------------------------------------------------------- 

Desired Result:

 ------ ------------------------------------------------------------ 
|   key|     var_number  |  var_rate           |    var_position    |
 ------ ------------------------------------------------------------ 
|1     |       123456    |   111970            |         1          |
|1     |       123457    |   662352            |         2          | 
|1     |       123458    |   890               |         3          |
|1     |       123459    |   190               |         4          |     
|2     |       654321    |   211971            |         1          |
|2     |       654322    |   124               |         2          | 
|2     |       654323    |   421               |         3          |
 ------ ----------------- --------------------- -------------------- 

CodePudding user response:

You don't need to use UDF here. You can easily transform the array column var by converting each element into a map using str_to_map after removing the square brackets ([]) with regexp_replace function. Finally, explode the transformed array and select the fields:

val df = Seq(
  (1, Seq("[number:123456 rate:111970 position:1]", "[number:123457 rate:662352 position:2]", "[number:123458 rate:890 position:3]", "[number:123459 rate:190 position:4]")),
  (2, Seq("[number:654321 rate:211971 position:1]", "[number:654322 rate:124 position:2]", "[number:654323 rate:421 position:3]"))
).toDF("key", "var")

val result = df.withColumn(
  "var", 
  explode(expr(raw"transform(var, x -> str_to_map(regexp_replace(x, '[\\[\\]]', ''), ' '))"))
).select(
  col("key"),
  col("var").getField("number").alias("var_number"),
  col("var").getField("rate").alias("var_rate"),
  col("var").getField("position").alias("var_position")
)

result.show
// --- ---------- -------- ------------ 
//|key|var_number|var_rate|var_position|
// --- ---------- -------- ------------ 
//|  1|    123456|  111970|           1|
//|  1|    123457|  662352|           2|
//|  1|    123458|     890|           3|
//|  1|    123459|     190|           4|
//|  2|    654321|  211971|           1|
//|  2|    654322|     124|           2|
//|  2|    654323|     421|           3|
// --- ---------- -------- ------------ 

From you comment, it appears the column var is of type string not array. In this case, you can first transform it by removing [] and " characters then split by comma to get an array:

val df = Seq(
  (1, """["[number:123456 rate:111970 position:1]", "[number:123457 rate:662352 position:2]", "[number:123458 rate:890 position:3]", "[number:123459 rate:190 position:4]"]"""),
  (2, """["[number:654321 rate:211971 position:1]", "[number:654322 rate:124 position:2]", "[number:654323 rate:421 position:3]"]""")
).toDF("key", "var")

val result = df.withColumn(
  "var", 
  split(regexp_replace(col("var"), "[\\[\\]\"]", ""), ",")
).withColumn(
  "var", 
  explode(expr("transform(var, x -> str_to_map(x, ' '))"))
).select(
  // select your columns as above...
)
  • Related