There are two dataframe, one is info table, and another one is reference table. I need to multiple two columns based on the conditions, here is the details:
Dataframe (Info)
----- -----
| key|value|
----- -----
| a| 10|
| b| 20|
| c| 50|
| d| 40|
----- -----
Dataframe (Reference)
----- ----------
| key|percentage|
----- ----------
| a| 0.1|
| b| 0.5|
----- ----------
Dataframe (this is the output I want)
----- ------
| key|result|
----- ------
| a| 1| (10 * 0.1 = 1)
| b| 10| (20 * 0.5 = 10)
| c| 50| (because there are no key matching in reference table, then remain the same)
| d| 40| (because there are no key matching in reference table, then remain the same)
----- ------
I have try the below code but failed.
df_cal = (
info
.withColumn('result', f.when(f.col('key')==reference.withColumn(f.col('key')), \
f.col('value)*reference.withColumn(f.col('percentage')) ))
.select('key', 'result')
)
df_cal.show()
CodePudding user response:
Join and multiply. Code and logic below
new_info = (info.join(broadcast(Reference), on='key', how='left')#Join the two dataframes
.na.fill(1.0)#Fill null with 1
.withColumn('result', col('value')*col('percentage'))#multiply the columns and store in results
.drop('value','percentage')#drop unwanted columns
)
new_info.show()
CodePudding user response:
a slight difference from wwnde's solution, with the overall logic remaining same, would be to use coalesce
instead of the fillna
. fillna
, if used without subset, can fill unwanted columns as well - and in any case, it generates a new projection in the spark plan.
example using coalesce
data1_sdf. \
join(data2_sdf, ['key'], 'left'). \
withColumn('result',
func.coalesce(func.col('value') * func.col('percentage'), func.col('value'))
). \
show()
# --- ----- ---------- ------
# |key|value|percentage|result|
# --- ----- ---------- ------
# | d| 40| null| 40.0|
# | c| 50| null| 50.0|
# | b| 20| 0.5| 10.0|
# | a| 10| 0.1| 1.0|
# --- ----- ---------- ------
CodePudding user response:
If you are willing to use Spark SQL instead of the DataFrame API, you can do this approach:
Create dataframes. (Optional since you already have the data)
from pyspark.sql.types import StructType,StructField, IntegerType, FloatType, StringType
# create info dataframe
info_data = [
("a",10),
("b",20),
("c",50),
("d",40),
]
info_schema = StructType([
StructField("key",StringType()),
StructField("value",IntegerType()),
])
info_df = spark.createDataFrame(data=info_data,schema=info_schema)
# create reference dataframe
reference_data = [
("a",.1),
("b",.5)
]
reference_schema = StructType([
StructField("key",StringType()),
StructField("percentage",FloatType()),
])
reference_df = spark.createDataFrame(data=reference_data,schema=reference_schema)
reference_df.show()
Next we need to create views of the 2 dataframes to run sql queries. Below we create a view called info
from info_df
and a view called reference
from reference_df
# create views: info and reference
info_df.createOrReplaceTempView("info")
reference_df.createOrReplaceTempView("reference")
Finally we write a query to perform the multiplication.
The query performs a left join between info and reference and then multiplies value
by percentage
. The key part is that we coalesce
percentage
with 1. Thus if percentage
is null, then value
is multiplied by 1.
from pyspark.sql.functions import coalesce
my_query = """
select
i.key,
-- coalese the percentage with 1. If percentage is null then it gets replaced by 1
i.value * coalesce(r.percentage,1) as result
from info i
left join reference r
on i.key = r.key
"""
final_df = spark.sql(my_query)
final_df.show()
Output:
--- ------
|key|result|
--- ------
| a| 1.0|
| b| 10.0|
| c| 50.0|
| d| 40.0|
--- ------