What I have is two DataFrame
s, each representing a probability distribution, but each stored one entry per row. For example one is df1
:
item_id | probability
--------|---------------
item1 | 0.1
item2 | 0.2
item3 | 0.7
and another, let's call it df2
:
item_id | probability
--------|---------------
item2 | 0.3
item3 | 0.5
item4 | 0.2
and please notice that the item space for these two are different. But this is ok because what it means is that df1
has zero probability for item4
and df2
has zero probability for item1
. What I'd like is code without heavy use of custom UDFs that basically produces a DataFrame
that, given some alpha
double value, blends those two distribution. I can write this with custom UDFs but I'm wondering if there's some pure Spark SQL based code that does this with only built-in functions.
item_id | probability
--------|---------------
item1 | 0.1 * alpha 0.0 * (1 - alpha)
item2 | 0.2 * alpha 0.3 * (1 - alpha)
item3 | 0.7 * alpha 0.5 * (1 - alpha)
item4 | 0.0 * alpha 0.2 * (1 - alpha)
CodePudding user response:
I think this is totally doable with SQL. The secret is an "outer" join with some coalesce magic to handle nulls.
import org.apache.spark.sql.types._
val someData = Seq( ("item1", 0.1 ), ("item2", 0.2),("item3", 0.7) )
val alpha = 1.1
val someMoreData = Seq( ("item2", 0.3 ), ("item3", 0.5),("item4", 0.2) )
val df1 = spark.sparkContext.parallelize(someMoreData).toDF( "item_id","probability" )
val df2 = spark.sparkContext.parallelize(someData).toDF( "item_id","probability" )
val prob = df2
.join(df1, df1("item_id") === df2("item_id"), "outer" )
.select(
coalesce( df1("item_id"), df2("item_id") ).alias("item_id"),
coalesce( df1("probability"),lit(0.0)).alias("probability1"),
coalesce( df2("probability"),lit(0.0)).alias("probability2"),
lit(alhpa).alias("alpha") )
prob.show()
------- ------------ ------------ -----
|item_id|probability1|probability2|alpha|
------- ------------ ------------ -----
| item3| 0.5| 0.7| 1.1|
| item2| 0.3| 0.2| 1.1|
| item1| 0.0| 0.1| 1.1|
| item4| 0.2| 0.0| 1.1|
------- ------------ ------------ -----
prob.select( prob("probability1") * prob("alpha") prob("probability2").multiply( lit(1.0) - prob("alpha")), prob("item_id") ).show();
--------------------------------------------------------- -------
|((probability1 * alpha) (probability2 * (1.0 - alpha)))|item_id|
--------------------------------------------------------- -------
| 0.48| item3|
| 0.31| item2|
| -0.01000000000000...| item1|
| 0.22000000000000003| item4|
--------------------------------------------------------- -------
CodePudding user response:
It can be solved with a full-join and simple operation. Here you have a snippet. Just bear in mind, that for clarification I changed prob
names in both dataframes to prob1
and prob2
respectively. Then it's just something like:
alpha = 0.5
df = df1 \
.join(df2, how='full', on='item_id') \
.fillna(0) \
.withColumn('prob', alpha * F.col('prob1') (1-alpha) * F.col('prob2'))
Your data sample:
df1 = spark.createDataFrame(data=[
('item1', 0.1),
('item2', 0.2),
('item3', 0.7)
], schema=['item_id', 'prob1'])
df2 = spark.createDataFrame(data=[
('item2', 0.3),
('item3', 0.5),
('item4', 0.2)
], schema=['item_id', 'prob2'])