I have a dataframe with multiple prices per customer and would like to fetch the relevant price depending on the value in the column. Such that my dataframe is as follows:
customer | price_point | price_a | price_b | price_c |
---|---|---|---|---|
A | price_b | 1 | 2 | 2 |
B | price_a | 1 | 2 | 2 |
C | price_c | 1 | 2 | 2 |
and I would expect something like the following:
customers = customers.withColumn("final_price", col(col("price_point")))
To return:
customer | price_point | price_a | price_b | price_c | final_price |
---|---|---|---|---|---|
A | price_b | 1 | 2 | 3 | 2 |
B | price_a | 1 | 2 | 3 | 1 |
C | price_c | 1 | 2 | 3 | 3 |
However, I get an error that the column is not iterable. Is it possible in Spark to use the value of a column dynamically?
CodePudding user response:
One way is to use a UDF as follows:
from pyspark.sql import functions as F
from pyspark.sql import types as T
@F.udf(T.IntegerType())
def get_price(price_point, price_a, price_b, price_c):
prices = {
"price_a": price_a,
"price_b": price_b,
"price_c": price_c
}
return prices.get(price_point)
(
df.withColumn(
"final_price",
get_price("price_point", "price_a", "price_b", "price_c")
)
.show()
)
Also pay attention to the @F.udf(T.IntegerType())
output signature. If your price is float, it should be @F.udf(T.FloatType())
, etc.
Hope this helps :)
CodePudding user response:
If there are only limited price column then a case when should work fine
customers.withColumn("final_price",expr("case when price_point='price_a' then price_a when price_point='price_b' then price_b else price_c end"))