Home > Blockchain >  pyspark create new case statement variable within struct for each array element
pyspark create new case statement variable within struct for each array element

Time:08-16

I want to create a new variable within each structure element of the array in this data frame. The new variable would be called 'statusrating' and would take each status string and map it to an integer. Status = "APPROVED" = 1, Status = "PENDING" = 0, Status = "REJECTED" = -1. I assume this requires a CASE WHEN statement but I haven't been able to figure it out. Could someone help me with the correct code?

Current df:

form_info:array
|   element:struct
|   |    id :string
|   |    status:string

Desired df:

form_info:array
|   element:struct
|   |    id :string
|   |    status:string
|   |    statusrating:integer

CodePudding user response:

For spark 3.1 , you can use withField to add a new field within an existing struct.

data_sdf. \
    withColumn('new_struct',
               func.when(func.col('old_struct.status') == 'APPROVED', 
                         func.col('old_struct').withField('statusrating', func.lit(1))
                         ).
               when(func.col('old_struct.status') == 'PENDING', 
                    func.col('old_struct').withField('statusrating', func.lit(0))
                    ).
               when(func.col('old_struct.status') == 'REJECTED', 
                    func.col('old_struct').withField('statusrating', func.lit(-1))
                    )
               )

If withField is unavailable, you'd need to recreate the whole struct.

data_sdf. \
    withColumn('new_struct',
               func.when(func.col('old_struct.status') == 'APPROVED', 
                         func.expr('struct(old_struct.*, 1 as statusrating)')
                         ).
               when(func.col('old_struct.status') == 'PENDING', 
                    func.expr('struct(old_struct.*, 0 as statusrating)')
                    ).
               when(func.col('old_struct.status') == 'REJECTED', 
                    func.expr('struct(old_struct.*, -1 as statusrating)')
                    )
               )

Based on the example row shared in the question comments, you have an array of structs and would need to use transform.

Given the input data as following array of structs with id and status as struct fields.

#  --------------------------------------------------------------- 
# |form_info                                                      |
#  --------------------------------------------------------------- 
# |[{12345667, APPROVED}, {1789083, PENDING}, {1789876, REJECTED}]|
#  --------------------------------------------------------------- 

You'll use transform to update the values within an array

data_sdf. \
    withColumn('form_info_updated',
               func.expr('''transform(form_info, x -> if(x.status = "APPROVED", struct(x.id, x.status, 1 as statusrating),
                                                         if(x.status = "PENDING", struct(x.id, x.status, 0 as statusrating),
                                                            if(x.status = "REJECTED", struct(x.id, x.status, -1 as statusrating), struct(x.id, x.status, null as statusrating))
                                                            )
                                                         )
                                      )
                         ''')
               ). \
    show(truncate=False)

#  --------------------------------------------------------------- ------------------------------------------------------------------------- 
# |form_info                                                      |form_info_updated                                                        |
#  --------------------------------------------------------------- ------------------------------------------------------------------------- 
# |[{12345667, APPROVED}, {1789083, PENDING}, {1789876, REJECTED}]|[{12345667, APPROVED, 1}, {1789083, PENDING, 0}, {1789876, REJECTED, -1}]|
#  --------------------------------------------------------------- ------------------------------------------------------------------------- 

CodePudding user response:

didn't notice this was in pyspark, but I'll post this anyways in case someone finds this solution useful in scala.

In this scenario, I would use Dataset and a mapping function. I think it would be a clean solution and scales well if you have many case matches.

  import spark.implicits._

  case class Form(form_info: String, element: Array[Status])

  case class Status(id: String, status: String)

  case class NewForm(form_info: String, element: Array[NewStatus])

  case class NewStatus(id: String, status: String, statusInt: Int)

  val ds = Seq(
    Form("Form1", Array(
      Status("id1", "APPROVED"),
      Status("id2", "PENDING"),
      Status("id3", "REJECTED")
    )),
  ).toDS()

  def mapStatusToInt(status: String): Int = {
    status match {
      case "APPROVED" => 1
      case "PENDING" => 0
      case "REJECTED" => -1
    }
  }

  val newDs = ds.map {
    form: Form => {
      NewForm(form.form_info, form.element.map(
        status => NewStatus(status.id, status.status, mapStatusToInt(status.status))
      ))
    }
  }

  newDs.printSchema()
  newDs.show(false)

Result:

root
 |-- form_info: string (nullable = true)
 |-- element: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- status: string (nullable = true)
 |    |    |-- statusInt: integer (nullable = false)

 --------- ------------------------------------------------------------ 
|form_info|element                                                     |
 --------- ------------------------------------------------------------ 
|Form1    |[{id1, APPROVED, 1}, {id2, PENDING, 0}, {id3, REJECTED, -1}]|
 --------- ------------------------------------------------------------ 

I think it should be possible to apply a similar solution in pyspark using Dataframe.map, I will update the answer in case I figure it out.

Hope this is helpful in the offchance that you can use scala.

  • Related