Problem Setup
Given a data frame where each row is a struct with an answers
field that holds an array of answer structs, each with multiple fields, the following code is supposed to process each answer in the array, by examining its render
field and applying some process to it. (Note this code is for an AWS Glue notebook running Glue 3.0, but aside from the spark context creation it should work on any PySpark >= 3.1):
%glue_version 3.0
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import pyspark.sql.functions as F
import pyspark.sql.types as T
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
test_schema = T.StructType([
T.StructField('question_id', T.StringType(), True),
T.StructField('subject', T.StringType(), True),
T.StructField('answers', T.ArrayType(
T.StructType([
T.StructField('render', T.StringType(), True),
T.StructField('encoding', T.StringType(), True),
T.StructField('misc_info', T.StructType([
T.StructField('test', T.StringType(), True)
]), True)
]), True), True)
])
json_df = spark.createDataFrame(data=[
[1, "maths", [("[tex]a1[/tex]", "text", ("x",)),("a2", "text", ("y",))]],
[2, "bio", [("b1", "text", ("z",)),("<p>b2</p>", "text", ("q",))]],
[3, "physics", None]
], schema=test_schema)
json_df.show(truncate=False)
json_df.printSchema()
resulting in:
----------- ------- ---------------------------------------------
|question_id|subject|answers |
----------- ------- ---------------------------------------------
|1 |maths |[{[tex]a1[/tex], text, {x}}, {a2, text, {y}}]|
|2 |bio |[{b1, text, {z}}, {<p>b2</p>, text, {q}}] |
|3 |physics|null |
----------- ------- ---------------------------------------------
root
|-- question_id: string (nullable = true)
|-- subject: string (nullable = true)
|-- answers: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- render: string (nullable = true)
| | |-- encoding: string (nullable = true)
| | |-- misc_info: struct (nullable = true)
| | | |-- test: string (nullable = true)
Then taking the text processing method:
import re
@F.udf(returnType=T.StringType())
def determine_encoding_udf(render):
if render:
if "[tex]" in render:
return "tex"
match = re.search(r"<[^>]*>", render)
if match:
return "html"
else:
return "null render"
return "text"
Apply the transformation:
def normalize_answer(answer):
return answer.withField(
"processed_render_input",
answer.getField("render")
).withField(
"encoding",
determine_encoding_udf(answer.getField("render"))
)
json_mod_df = json_df.withColumn(
"answers",
F.transform("answers", normalize_answer)
)
json_mod_df.show(truncate=False)
Resulting in:
----------- ------- ------------------------------------------------------------------------------
|question_id|subject|answers |
----------- ------- ------------------------------------------------------------------------------
|1 |maths |[{[tex]a1[/tex], null render, {x}, [tex]a1[/tex]}, {a2, null render, {y}, a2}]|
|2 |bio |[{b1, null render, {z}, b1}, {<p>b2</p>, null render, {q}, <p>b2</p>}] |
|3 |physics|null |
----------- ------- ------------------------------------------------------------------------------
process_text
is complex so this entire process can't be expressed in a lambda where the transform is defined.
The Problem
The problem is that when I run this on a much larger answers data set, processed_text_input
is completely different text to the encoding
field, i.e. what's in encoding
seems to come from processing a completely different answer struct, from a different array from some other item in the input data frame. Sometimes it's also missing all together. In this toy example the text just seems to be missing, resulting in "null render" appearing in the encoding column for all examples. I tried adding 100 rows but in all cases the text was missing. I'm not sure why in my full version of this code with ~50k rows I get text from random rows. At any rate no text as shown in this toy problem isn't what I want either. The value in processed_text_input
is the correct value from the corresponding answers text field.
Is this a bug or should I be structuring this expression differently? I know that I could explode the answers array and process it more traditionally, but I'm trying to use the more recent transform
and withField
functions.
Makeshift Solution
This isn't an answer per se, but it gets a solution so I'm posting it for anyone having the same problem. For a post to be awarded as the solution it would need to show how to achieve the result I want using transformations, this uses the explode function but it covers a non-trivial data structure. I've added this solution as an answer below, for those who like me skip straight to the solutions.
CodePudding user response:
It will help if you can share more info like sample dataset.
I tried to recreate your problem; but udf
call inside the lambda function for array -> transform
results in some invalid state where probably the function arguments are gone after evaluation. I wonder, how come your code is running fine.
Anyways, there is an alternative if you can explode the answers
array and apply transformations on individual answer
struct. This way you get benefit of Spark's distributed processing, too.
import pyspark.sql.functions as F
import pyspark.sql.types as T
json_df = spark.createDataFrame(data=[[[("txt1",),("txt2",),("txt3",)]]], schema="answers array<struct<text string>>")
json_df.show(truncate=False)
json_df.printSchema()
@F.udf(returnType=T.StringType())
def process_text(text):
return "foo: " text
def normalize_answer(answer):
return answer.withField(
"processed_text",
process_text(answer.getField("text"))
).withField(
"processed_text_input",
answer.getField("text")
)
json_df = json_df.withColumn("answers", F.explode("answers"))
json_df = json_df.withColumn("processed_text", F.col("answers").getField("text"))
json_df = json_df.withColumn("processed_text_input", process_text(F.col("answers").getField("text")))
json_df.show(truncate=False)
Sample dataset used is array with 3 text literals:
------------------------
|answers |
------------------------
|[{txt1}, {txt2}, {txt3}]|
------------------------
root
|-- answers: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- text: string (nullable = true)
Output:
------- -------------- --------------------
|answers|processed_text|processed_text_input|
------- -------------- --------------------
|{txt1} |txt1 |foo: txt1 |
|{txt2} |txt2 |foo: txt2 |
|{txt3} |txt3 |foo: txt3 |
------- -------------- --------------------
CodePudding user response:
As hinted in the question, I have a makeshift solution. This is related to Azhar's answer where they suggest using the explode function, but ultimately I was looking for an answer that uses transformations and already had a sense of solving this problem using the explode function. So here's a full answer using the explode function, which processes the non-trivial data structure in the question, modifying the encoding value for each answer based on the corresponding render field, then collapses the data back into it's original structure:
json_mod_df = json_df.withColumn(
"answers",
F.explode_outer("answers")
).withColumn(
"answers",
F.col("answers").withField(
"encoding",
determine_encoding_udf(
F.col("answers").getField("render")
)
)
).groupBy(
"question_id", "subject"
).agg(
F.collect_list("answers").alias("answers")
)
json_mod_df.show(truncate=False)
json_df.printSchema()
Resulting in:
----------- ------- --------------------------------------------
|question_id|subject|answers |
----------- ------- --------------------------------------------
|2 |bio |[{b1, text, {z}}, {<p>b2</p>, html, {q}}] |
|1 |maths |[{[tex]a1[/tex], tex, {x}}, {a2, text, {y}}]|
|3 |physics|[] |
----------- ------- --------------------------------------------
root
|-- question_id: string (nullable = true)
|-- subject: string (nullable = true)
|-- answers: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- render: string (nullable = true)
| | |-- encoding: string (nullable = true)
| | |-- misc_info: struct (nullable = true)
| | | |-- test: string (nullable = true)
Note that in my full requirements beyond the scope of this question, I also needed to modify field's within the misc_info struct, for the above example, this can be achieved with:
.withColumn(
"answers",
F.col("answers").withField(
"misc_info.test",
F.col("answers").getField("misc_info").getField("test")
)
)
In my actual code I'm doing the following:
.withColumn(
"answers",
F.col("answers").withField(
"misc_info.comments",
F.coalesce(F.col("answers").getField("misc_info").getField("comments"), F.array())
)
)