I have a dataframe with two columns. The first is a column of unique IDs and the second is a colon delimited list of student scores( this is after loading it from a CSV without headers).
Is there any mechanism to convert the second column to a list of structs for further processing? Or a dynamic number of additional columns? I just need a way to do additional processing for the scores for each id i.e. calculate the mean for id 0000000003
which can't be done in the current output data format.
I.e.
---------- -----------------------------
|id |scores |
---------- -----------------------------
|0000000003|brian,1.0:steve,2.3:allie,8.0|
|0783563078|chris,1.0 |
|0783801254|michelle,1.0:vixon,2.3 |
---------- -----------------------------
into
---------- --------------------------------------------------------------------------
|id |scores |
---------- --------------------------------------------------------------------------
|0000000003|[{student -> brian, score -> 1.0 } , {student -> steve, score -> 2.3 .... |
---------- --------------------------------------------------------------------------
or potentially something like this:
---------- -------- ------ -------- ------ ------
|id |student1|score1|student2|score3|etc...|
---------- -------- ------ -------- ------ ------
|0000000003| | | | | |
---------- -------- ------ -------- ------ ------
I am just not sure of how to turn this data format into something that is processable.
CodePudding user response:
Approach 4 is possibly the shortest to get the average, but the other approaches allow you to extract the data as maps/structs.
Approach 1
An easily accessible approach may be to use str_to_map
which will convert your string value to a map. You could then use map_values
to extract the scores eg
(
df.withColumn(
"score_map",
expr("str_to_map(scores,':',',')")
).withColumn(
"score_values",
map_values(F.expr("str_to_map(scores,':',',')"))
)
).show(false)
---------- ----------------------------- ------------------------------------------ ---------------
|id |scores |score_map |score_values |
---------- ----------------------------- ------------------------------------------ ---------------
|0000000003|brian,1.0:steve,2.3:allie,8.0|{brian -> 1.0, steve -> 2.3, allie -> 8.0}|[1.0, 2.3, 8.0]|
|0783563078|chris,1.0 |{chris -> 1.0} |[1.0] |
|0783801254|michelle,1.0:vixon,2.3 |{michelle -> 1.0, vixon -> 2.3} |[1.0, 2.3] |
---------- ----------------------------- ------------------------------------------ ---------------
Since you are only interested in the average score, you may also use explode
to split the array returned by map_values
into multiple rows before aggregating using mean
. In the example below, I have included the original column score
in the group by, however you may remove it and achieve the same results in your application.
(
df.withColumn(
"score_values",
explode(map_values(F.expr("str_to_map(scores,':',',')")))
)
.groupBy("id","scores") // you may remove "scores" from here to only have the id
.agg(
mean("score_values").alias("score_avg")
)
).show(false)
---------- ----------------------------- -----------------
|id |scores |score_avg |
---------- ----------------------------- -----------------
|0000000003|brian,1.0:steve,2.3:allie,8.0|3.766666666666667|
|0783801254|michelle,1.0:vixon,2.3 |1.65 |
|0783563078|chris,1.0 |1.0 |
---------- ----------------------------- -----------------
Approach 2
If you prefer to use a struct
, you may use split
, transform
and named_struct
in spark-sql to convert your data in the desired struct
eg
val df2=(
df.withColumn(
"score_struct",
expr("transform(split(scores,':'), x-> named_struct('student',split(x,',')[0],'score',split(x,',')[1]) )")
)
)
df2.printSchema()
df2.show(false)
root
|-- id: string (nullable = true)
|-- scores: string (nullable = true)
|-- score_struct: array (nullable = true)
| |-- element: struct (containsNull = false)
| | |-- student: string (nullable = true)
| | |-- score: string (nullable = true)
---------- ----------------------------- ------------------------------------------
|id |scores |score_struct |
---------- ----------------------------- ------------------------------------------
|0000000003|brian,1.0:steve,2.3:allie,8.0|[{brian, 1.0}, {steve, 2.3}, {allie, 8.0}]|
|0783563078|chris,1.0 |[{chris, 1.0}] |
|0783801254|michelle,1.0:vixon,2.3 |[{michelle, 1.0}, {vixon, 2.3}] |
---------- ----------------------------- ------------------------------------------
We can again, explode
to split the list of values in each row into multiple rows before using mean
to determine the average eg.
df2=(
df.withColumn(
"score_struct",
expr("explode(transform(split(scores,':'), x-> named_struct('student',split(x,',')[0],'score',split(x,',')[1]) ))")
)
.groupBy("id")
.agg(
mean("score_struct.score").alias("score_avg")
)
)
df2.printSchema()
df2.show(truncate=False)
root
|-- id: string (nullable = true)
|-- score_avg: double (nullable = true)
---------- -----------------
|id |score_avg |
---------- -----------------
|0000000003|3.766666666666667|
|0783563078|1.0 |
|0783801254|1.65 |
---------- -----------------
Approach 3
You could simply use approach 2 only extract your desired values i.e. the scores, before calculating the average eg:
val df2=(
df.withColumn(
"score",
expr("explode(transform(split(scores,':'), x-> split(x,',')[1] ))")
)
.groupBy("id")
.agg(
mean("score").alias("score_avg")
)
)
df2.printSchema()
df2.show(false)
root
|-- id: string (nullable = true)
|-- score_avg: double (nullable = true)
---------- -----------------
|id |score_avg |
---------- -----------------
|0000000003|3.766666666666667|
|0783563078|1.0 |
|0783801254|1.65 |
---------- -----------------
Approach 4
This approach uses split
and aggregate
to extract the sum of each row before dividing by the number of entries to find the average
df2=(
df.withColumn(
"scores",
split("scores",':')
)
.withColumn(
"scores",
expr("aggregate(scores,cast(0 as double), (acc,x) -> acc split(x,',')[1])") / size("scores")
)
)
df2.printSchema()
df2.show(false)
root
|-- id: string (nullable = true)
|-- scores: double (nullable = true)
---------- -----------------
|id |scores |
---------- -----------------
|0000000003|3.766666666666667|
|0783563078|1.0 |
|0783801254|1.65 |
---------- -----------------