I am using PySpark to pipe an RDD out to an external process (stdin/stdout).
piped_rdd = rdd.pipe(exe_path)
When I examine the returned PipeLineRDD all rows have been converted to strings
["Row(ID='x123223=', FirstName='L', LastName='S')", "Row(ID='43454".....)]
Is it possible to convert these strings back into proper rows?
CodePudding user response:
We could use eval()
. I tested the following which seems to work on the sample data.
val_ls = [
"Row(ID='x123223=', FirstName='L', LastName='S')",
"Row(ID='x123224=', FirstName='K', LastName='P')"
]
def evalRow(theRowString):
"""
imports pyspark.sql.Row and uses `eval()` to resolve the Row strings
"""
from pyspark.sql import Row
return eval(theRowString)
spark.sparkContext.parallelize(val_ls).map(lambda k: evalRow(k)).collect()
# [Row(ID='x123223=', FirstName='L', LastName='S'),
# Row(ID='x123224=', FirstName='K', LastName='P')]
I checked the type
of the results
set(spark.sparkContext.parallelize(val_ls).map(lambda k: type(evalRow(k))).collect())
# {pyspark.sql.types.Row}
I was initially just importing Row
without creating the function and used eval()
directly in the map()
--
from pyspark.sql import Row
spark.sparkContext.parallelize(val_ls).map(lambda k: eval(k)).collect()
I kept hitting the error NameError: name 'Row' is not defined
. I think it is because we aren't importing Row
on the workers and it does not know how to evaluate the eval("Row(...)")