I am new to PySpark and Spark in general. I would like to apply transformation on a given column in the DataFrame, essentially call a function for each value on that specific column.
I have my DataFrame df
that looks like this:
df.show()
------------ --------------------
|version | body |
------------ --------------------
| 1|9gIAAAASAQAEAAAAA...|
| 2|2gIAAAASAQAEAAAAA...|
| 3|3gIAAAASAQAEAAAAA...|
| 1|7gIAKAASAQAEAAAAA...|
------------ --------------------
I need to read value of body
column for each row where the version
is 1
and then decrypt it (I have my own logic/function which takes a string and returns a decrypted string). Finally, write the decrypted values in csv format to a S3 bucket.
def decrypt(encrypted_string: str):
# code that returns decrypted string
So, When I do following, I get the corresponding filtered values to which I need to apply my decrypt function.
df.where(col('version') =='1')\
.select(col('body')).show()
--------------------
| body|
--------------------
|9gIAAAASAQAEAAAAA...|
|7gIAKAASAQAEAAAAA...|
--------------------
However, I am not clear how to do that. I tried to use collect()
but then it defeats the purpose of using Spark.
I also tried using .rdd.map
as follows but that did not work.
df.where(col('version') =='1')\
.select(col('body'))\
.rdd.map(lambda x: decrypt).toDF().show()
OR
.rdd.map(decrypt).toDF().show()
Could someone please help with this.
CodePudding user response:
Please try:
from pyspark.sql.functions import udf
decrypt_udf = udf(decrypt, StringType())
df.where(col('version') =='1').withColumn('body', decrypt_udf('body'))
CodePudding user response:
Got some clue from this post: Pyspark DataFrame UDF on Text Column.
Looks like I can simply get it with following. I was doing it without using udf
earlier, so it wasn't working.
dummy_function_udf = udf(decrypt, StringType())
df.where(col('version') == '1')\
.select(col('body')) \
.withColumn('decryptedBody', dummy_function_udf('body')) \
.show()