I want to create a new variable within each structure element of the array in this data frame. The new variable would be called 'statusrating' and would take each status string and map it to an integer. Status = "APPROVED" = 1, Status = "PENDING" = 0, Status = "REJECTED" = -1. I assume this requires a CASE WHEN statement but I haven't been able to figure it out. Could someone help me with the correct code?
Current df:
form_info:array
| element:struct
| | id :string
| | status:string
Desired df:
form_info:array
| element:struct
| | id :string
| | status:string
| | statusrating:integer
CodePudding user response:
For spark 3.1 , you can use withField
to add a new field within an existing struct.
data_sdf. \
withColumn('new_struct',
func.when(func.col('old_struct.status') == 'APPROVED',
func.col('old_struct').withField('statusrating', func.lit(1))
).
when(func.col('old_struct.status') == 'PENDING',
func.col('old_struct').withField('statusrating', func.lit(0))
).
when(func.col('old_struct.status') == 'REJECTED',
func.col('old_struct').withField('statusrating', func.lit(-1))
)
)
If withField
is unavailable, you'd need to recreate the whole struct.
data_sdf. \
withColumn('new_struct',
func.when(func.col('old_struct.status') == 'APPROVED',
func.expr('struct(old_struct.*, 1 as statusrating)')
).
when(func.col('old_struct.status') == 'PENDING',
func.expr('struct(old_struct.*, 0 as statusrating)')
).
when(func.col('old_struct.status') == 'REJECTED',
func.expr('struct(old_struct.*, -1 as statusrating)')
)
)
Based on the example row shared in the question comments, you have an array of structs and would need to use transform
.
Given the input data as following array of structs with id
and status
as struct fields.
# ---------------------------------------------------------------
# |form_info |
# ---------------------------------------------------------------
# |[{12345667, APPROVED}, {1789083, PENDING}, {1789876, REJECTED}]|
# ---------------------------------------------------------------
You'll use transform
to update the values within an array
data_sdf. \
withColumn('form_info_updated',
func.expr('''transform(form_info, x -> if(x.status = "APPROVED", struct(x.id, x.status, 1 as statusrating),
if(x.status = "PENDING", struct(x.id, x.status, 0 as statusrating),
if(x.status = "REJECTED", struct(x.id, x.status, -1 as statusrating), struct(x.id, x.status, null as statusrating))
)
)
)
''')
). \
show(truncate=False)
# --------------------------------------------------------------- -------------------------------------------------------------------------
# |form_info |form_info_updated |
# --------------------------------------------------------------- -------------------------------------------------------------------------
# |[{12345667, APPROVED}, {1789083, PENDING}, {1789876, REJECTED}]|[{12345667, APPROVED, 1}, {1789083, PENDING, 0}, {1789876, REJECTED, -1}]|
# --------------------------------------------------------------- -------------------------------------------------------------------------
CodePudding user response:
didn't notice this was in pyspark, but I'll post this anyways in case someone finds this solution useful in scala.
In this scenario, I would use Dataset and a mapping function. I think it would be a clean solution and scales well if you have many case matches.
import spark.implicits._
case class Form(form_info: String, element: Array[Status])
case class Status(id: String, status: String)
case class NewForm(form_info: String, element: Array[NewStatus])
case class NewStatus(id: String, status: String, statusInt: Int)
val ds = Seq(
Form("Form1", Array(
Status("id1", "APPROVED"),
Status("id2", "PENDING"),
Status("id3", "REJECTED")
)),
).toDS()
def mapStatusToInt(status: String): Int = {
status match {
case "APPROVED" => 1
case "PENDING" => 0
case "REJECTED" => -1
}
}
val newDs = ds.map {
form: Form => {
NewForm(form.form_info, form.element.map(
status => NewStatus(status.id, status.status, mapStatusToInt(status.status))
))
}
}
newDs.printSchema()
newDs.show(false)
Result:
root
|-- form_info: string (nullable = true)
|-- element: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- id: string (nullable = true)
| | |-- status: string (nullable = true)
| | |-- statusInt: integer (nullable = false)
--------- ------------------------------------------------------------
|form_info|element |
--------- ------------------------------------------------------------
|Form1 |[{id1, APPROVED, 1}, {id2, PENDING, 0}, {id3, REJECTED, -1}]|
--------- ------------------------------------------------------------
I think it should be possible to apply a similar solution in pyspark using Dataframe.map, I will update the answer in case I figure it out.
Hope this is helpful in the offchance that you can use scala.