Home > Back-end >  Replace column string name with another column value in Spark Scala
Replace column string name with another column value in Spark Scala

Time:11-21

I have the following dataframe with a column sig and N other columns.

sig contains N number of column embedded into it as shown below. The embedded column names can be of any number present in the dataframe.

I want to update the sig column with the corresponding values from the other columns.

For example,

 --------------------------------------------------------------------- ------------ ------------------ ------------------- -------- 
|sig                                                                  |order_timing|po_manl_create_ind|mabd_arrival_status|cut_time|
 --------------------------------------------------------------------- ------------ ------------------ ------------------- -------- 
|R1:BR1-order_timing:BR2-po_manl_create_ind:BR3-mabd_arrival_status:R1|14          |0                 |late               |23      |
|R1:BR1-order_timing:BR2-po_manl_create_ind:BR7-cut_time:R1           |14          |0                 |on_time            |10      |

Expected output

 --------------------------------------------------------------------- ------------ -----
|sig                        |order_timing|po_manl_create_ind|mabd_arrival_status|cut_time|
 --------------------------------------------------------------------- ------------ -----
|R1:BR1-14:BR2-0:BR3-late:R1|14          |0                 |late               |23      |
|R1:BR1-14:BR2-0:BR7-10:R1  |14          |0                 |on_time            |10      |

CodePudding user response:

One way is to chain multiple replace expressions by using the list of columns likely to be present in sig values.

Using this sample DF:

val df = Seq(
   ("R1:BR1-order_timing:BR2-po_manl_create_ind:BR3-mabd_arrival_status:R1", 14, 0, "late", 23),
   ("R1:BR1-order_timing:BR2-po_manl_create_ind:BR7-cut_time:R1", 14, 0, "on_time", 10),
).toDF("sig", "order_timing", "po_manl_create_ind", "mabd_arrival_status", "cut_time")

You can generate the replacement expression replace_expr using foldLeft like this:

val replace_expr = df.columns
  .filter(_ != "sig")
  .foldLeft("sig")((acc, c) => s"replace($acc, '$c', $c)")

df.withColumn("sig", expr(replace_expr)).show(false)

// --------------------------- ------------ ------------------ ------------------- -------- 
//|sig                        |order_timing|po_manl_create_ind|mabd_arrival_status|cut_time|
// --------------------------- ------------ ------------------ ------------------- -------- 
//|R1:BR1-14:BR2-0:BR3-late:R1|14          |0                 |late               |23      |
//|R1:BR1-14:BR2-0:BR7-10:R1  |14          |0                 |on_time            |10      |
// --------------------------- ------------ ------------------ ------------------- -------- 
  • Related