Home > database >  PySpark: Transform values of given column in the DataFrame
PySpark: Transform values of given column in the DataFrame

Time:09-28

I am new to PySpark and Spark in general. I would like to apply transformation on a given column in the DataFrame, essentially call a function for each value on that specific column.

I have my DataFrame df that looks like this:

df.show()

 ------------ -------------------- 
|version     |         body       |
 ------------ -------------------- 
|           1|9gIAAAASAQAEAAAAA...|
|           2|2gIAAAASAQAEAAAAA...|
|           3|3gIAAAASAQAEAAAAA...|
|           1|7gIAKAASAQAEAAAAA...|
 ------------ -------------------- 

I need to read value of body column for each row where the version is 1 and then decrypt it (I have my own logic/function which takes a string and returns a decrypted string). Finally, write the decrypted values in csv format to a S3 bucket.

def decrypt(encrypted_string: str):
    # code that returns decrypted string

So, When I do following, I get the corresponding filtered values to which I need to apply my decrypt function.

df.where(col('version') =='1')\
     .select(col('body')).show()

 -------------------- 
|                body|
 -------------------- 
|9gIAAAASAQAEAAAAA...|
|7gIAKAASAQAEAAAAA...|
 -------------------- 

However, I am not clear how to do that. I tried to use collect() but then it defeats the purpose of using Spark.

I also tried using .rdd.map as follows but that did not work.

df.where(col('version') =='1')\
     .select(col('body'))\
     .rdd.map(lambda x: decrypt).toDF().show()

OR 

     .rdd.map(decrypt).toDF().show()

Could someone please help with this.

CodePudding user response:

Please try:

from pyspark.sql.functions import udf
decrypt_udf = udf(decrypt, StringType())
df.where(col('version') =='1').withColumn('body', decrypt_udf('body'))

CodePudding user response:

Got some clue from this post: Pyspark DataFrame UDF on Text Column. Looks like I can simply get it with following. I was doing it without using udf earlier, so it wasn't working.

dummy_function_udf = udf(decrypt, StringType())

df.where(col('version') == '1')\
   .select(col('body')) \
   .withColumn('decryptedBody', dummy_function_udf('body')) \
   .show() 

  • Related