Home > Blockchain >  Spark / Python : cannot pickle '_thread.RLock' object when passing multiple columns to UDF
Spark / Python : cannot pickle '_thread.RLock' object when passing multiple columns to UDF

Time:02-15

Using jupyter notebook, I have 2 dataframes :

ApplePrices :

 -------------------- ------- --------- -------- 
|          Model_name|  Price|  Country|Currency|
 -------------------- ------- --------- -------- 
|        24-inch iMac|1919.01|Australia|     AUD|
|         AirPods Max| 908.47|Australia|     AUD|
|         AirPods Pro|  403.2|Australia|     AUD|
|AirPods(2nd gener...| 221.31|Australia|     AUD|
|AirPods(3rd gener...| 281.94|Australia|     AUD|
|Apple Pencil (2nd...|  201.1|Australia|     AUD|
|         Apple TV 4K| 251.62|Australia|     AUD|
|      Apple Watch SE| 433.52|Australia|     AUD|
|Apple Watch Series 3| 302.15|Australia|     AUD|
|         MacBook Air| 1514.8|Australia|     AUD|
|         Magic Mouse| 110.15|Australia|     AUD|
|          Sport Band|  69.73|Australia|     AUD|
|                iPad| 504.26|Australia|     AUD|
|            iPad Pro|1211.64|Australia|     AUD|
|           iPhone 12|1009.53|Australia|     AUD|
|           iPhone 13|1211.64|Australia|     AUD|
|           iPhone SE| 686.16|Australia|     AUD|
|        24-inch iMac|1597.97|   Canada|     CAD|
|         AirPods Max|  778.5|   Canada|     CAD|
|         AirPods Pro| 328.79|   Canada|     CAD|
 -------------------- ------- --------- -------- 

currencyConversion : (ratio 1$ = xx Currency)

 -------- -------------------- 
|ISO_4217|Dollar_To_Curr_Ratio|
 -------- -------------------- 
|     EUR|                0.89|
|     CAD|                1.27|
|     CZK|                21.7|
|     DKK|                6.59|
|     HUF|              319.05|
|     INR|               74.42|
|     MXN|                20.5|
|     NOK|                 8.9|
|     PHP|               51.22|
|     PLN|                4.03|
|     RUB|               76.28|
|     SEK|                9.15|
|     THB|               33.57|
|     USD|                 1.0|
|     AUD|                 1.4|
 -------- -------------------- 

My goal is to convert all prices contained in ApplePrices to US dollars. So I used UDF :

def convertPriceToDollar(price, currency):
    ratio = currencyConversion.select("Dollar_To_Curr_Ratio").where(col("ISO_4217") == currency)
    return price * ratio
toDollarConverter = udf(convertPriceToDollar)

And select by calling the function by passing the 2 columns : applePrices.select(toDollarConverter('Price', 'Currency').alias("Price", "truc")).show()

But I got this error : PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object

It works fine if I just pass one column to the function like this :

def convertPriceToDollar(price):
    return price * 1.27
toDollarConverter = udf(convertPriceToDollar)

applePrices.select(toDollarConverter('Price').alias("Price")).show()

Results :

 ------------------ 
|             Price|
 ------------------ 
|         2437.1427|
|         1153.7569|
|           512.064|
|          281.0637|
|          358.0638|
|           255.397|
|319.55740000000003|
|          550.5704|
|383.73049999999995|
|          1923.796|
|          139.8905|
|           88.5571|
|          640.4102|
|1538.7828000000002|
|         1282.1031|
|1538.7828000000002|
|          871.4232|
|         2029.4219|
|           988.695|
|          417.5633|
 ------------------ 

CodePudding user response:

Do you need to use udfs?

Exemple datasets:

ApplePrices  = spark.createDataFrame(
  [
  ('24-inch iMac','1919.01','Australia','AUD')
 ,('AirPods Max','908.47','Australia','AUD')
 ,('24-inch iMac','1597.97','Canada','CAD')
  ], ['Model_name', 'Price', 'Country', 'Currency']
)

currencyConversion  = spark.createDataFrame(
  [
  ('AUD','1.4')
 ,('CAD','1.27')
  ], ['ISO_4217', 'Dollar_To_Curr_Ratio']
)

Using Spark DF:

from pyspark.sql.functions import *

ApplePrices_mod = ApplePrices\
  .join(currencyConversion.withColumnRenamed('ISO_4217', 'Currency'), 'Currency', 'left')\
  .withColumn('Price_USD',round(col('Price')/col('Dollar_To_Curr_Ratio'),2))\
  .select('Currency','Model_name','Price_USD','Country')

ApplePrices_mod.show()

Output:

 -------- ------------ --------- --------- 
|Currency|  Model_name|Price_USD|  Country|
 -------- ------------ --------- --------- 
|     AUD|24-inch iMac|  1370.72|Australia|
|     AUD| AirPods Max|   648.91|Australia|
|     CAD|24-inch iMac|  1258.24|   Canada|
 -------- ------------ --------- --------- 
  • Related