Using jupyter notebook, I have 2 dataframes :
ApplePrices :
-------------------- ------- --------- --------
| Model_name| Price| Country|Currency|
-------------------- ------- --------- --------
| 24-inch iMac|1919.01|Australia| AUD|
| AirPods Max| 908.47|Australia| AUD|
| AirPods Pro| 403.2|Australia| AUD|
|AirPods(2nd gener...| 221.31|Australia| AUD|
|AirPods(3rd gener...| 281.94|Australia| AUD|
|Apple Pencil (2nd...| 201.1|Australia| AUD|
| Apple TV 4K| 251.62|Australia| AUD|
| Apple Watch SE| 433.52|Australia| AUD|
|Apple Watch Series 3| 302.15|Australia| AUD|
| MacBook Air| 1514.8|Australia| AUD|
| Magic Mouse| 110.15|Australia| AUD|
| Sport Band| 69.73|Australia| AUD|
| iPad| 504.26|Australia| AUD|
| iPad Pro|1211.64|Australia| AUD|
| iPhone 12|1009.53|Australia| AUD|
| iPhone 13|1211.64|Australia| AUD|
| iPhone SE| 686.16|Australia| AUD|
| 24-inch iMac|1597.97| Canada| CAD|
| AirPods Max| 778.5| Canada| CAD|
| AirPods Pro| 328.79| Canada| CAD|
-------------------- ------- --------- --------
currencyConversion : (ratio 1$ = xx Currency)
-------- --------------------
|ISO_4217|Dollar_To_Curr_Ratio|
-------- --------------------
| EUR| 0.89|
| CAD| 1.27|
| CZK| 21.7|
| DKK| 6.59|
| HUF| 319.05|
| INR| 74.42|
| MXN| 20.5|
| NOK| 8.9|
| PHP| 51.22|
| PLN| 4.03|
| RUB| 76.28|
| SEK| 9.15|
| THB| 33.57|
| USD| 1.0|
| AUD| 1.4|
-------- --------------------
My goal is to convert all prices contained in ApplePrices to US dollars. So I used UDF :
def convertPriceToDollar(price, currency):
ratio = currencyConversion.select("Dollar_To_Curr_Ratio").where(col("ISO_4217") == currency)
return price * ratio
toDollarConverter = udf(convertPriceToDollar)
And select by calling the function by passing the 2 columns :
applePrices.select(toDollarConverter('Price', 'Currency').alias("Price", "truc")).show()
But I got this error : PicklingError: Could not serialize object: TypeError: cannot pickle '_thread.RLock' object
It works fine if I just pass one column to the function like this :
def convertPriceToDollar(price):
return price * 1.27
toDollarConverter = udf(convertPriceToDollar)
applePrices.select(toDollarConverter('Price').alias("Price")).show()
Results :
------------------
| Price|
------------------
| 2437.1427|
| 1153.7569|
| 512.064|
| 281.0637|
| 358.0638|
| 255.397|
|319.55740000000003|
| 550.5704|
|383.73049999999995|
| 1923.796|
| 139.8905|
| 88.5571|
| 640.4102|
|1538.7828000000002|
| 1282.1031|
|1538.7828000000002|
| 871.4232|
| 2029.4219|
| 988.695|
| 417.5633|
------------------
CodePudding user response:
Do you need to use udfs?
Exemple datasets:
ApplePrices = spark.createDataFrame(
[
('24-inch iMac','1919.01','Australia','AUD')
,('AirPods Max','908.47','Australia','AUD')
,('24-inch iMac','1597.97','Canada','CAD')
], ['Model_name', 'Price', 'Country', 'Currency']
)
currencyConversion = spark.createDataFrame(
[
('AUD','1.4')
,('CAD','1.27')
], ['ISO_4217', 'Dollar_To_Curr_Ratio']
)
Using Spark DF:
from pyspark.sql.functions import *
ApplePrices_mod = ApplePrices\
.join(currencyConversion.withColumnRenamed('ISO_4217', 'Currency'), 'Currency', 'left')\
.withColumn('Price_USD',round(col('Price')/col('Dollar_To_Curr_Ratio'),2))\
.select('Currency','Model_name','Price_USD','Country')
ApplePrices_mod.show()
Output:
-------- ------------ --------- ---------
|Currency| Model_name|Price_USD| Country|
-------- ------------ --------- ---------
| AUD|24-inch iMac| 1370.72|Australia|
| AUD| AirPods Max| 648.91|Australia|
| CAD|24-inch iMac| 1258.24| Canada|
-------- ------------ --------- ---------