Home > Back-end >  Is there a way, without Spark UDFs, to blend two distribution DataFrames that have different support
Is there a way, without Spark UDFs, to blend two distribution DataFrames that have different support

Time:05-06

What I have is two DataFrames, each representing a probability distribution, but each stored one entry per row. For example one is df1:

item_id |  probability
--------|---------------
  item1 |      0.1
  item2 |      0.2
  item3 |      0.7

and another, let's call it df2:

item_id |  probability
--------|---------------
  item2 |      0.3
  item3 |      0.5
  item4 |      0.2

and please notice that the item space for these two are different. But this is ok because what it means is that df1 has zero probability for item4 and df2 has zero probability for item1. What I'd like is code without heavy use of custom UDFs that basically produces a DataFrame that, given some alpha double value, blends those two distribution. I can write this with custom UDFs but I'm wondering if there's some pure Spark SQL based code that does this with only built-in functions.

item_id |  probability
--------|---------------
  item1 |      0.1 * alpha   0.0 * (1 - alpha)
  item2 |      0.2 * alpha   0.3 * (1 - alpha)
  item3 |      0.7 * alpha   0.5 * (1 - alpha)
  item4 |      0.0 * alpha   0.2 * (1 - alpha)

CodePudding user response:

I think this is totally doable with SQL. The secret is an "outer" join with some coalesce magic to handle nulls.

import org.apache.spark.sql.types._


val someData = Seq( ("item1", 0.1 ), ("item2", 0.2),("item3", 0.7) )  
val alpha = 1.1

val someMoreData = Seq( ("item2", 0.3 ), ("item3", 0.5),("item4", 0.2) )  

val df1 = spark.sparkContext.parallelize(someMoreData).toDF( "item_id","probability" )
val df2 = spark.sparkContext.parallelize(someData).toDF( "item_id","probability" )
val prob = df2
  .join(df1, df1("item_id") === df2("item_id"), "outer" )
  .select( 
    coalesce( df1("item_id"), df2("item_id") ).alias("item_id"), 
    coalesce( df1("probability"),lit(0.0)).alias("probability1"),
    coalesce( df2("probability"),lit(0.0)).alias("probability2"), 
    lit(alhpa).alias("alpha") )
prob.show()
 ------- ------------ ------------ ----- 
|item_id|probability1|probability2|alpha|
 ------- ------------ ------------ ----- 
|  item3|         0.5|         0.7|  1.1|
|  item2|         0.3|         0.2|  1.1|
|  item1|         0.0|         0.1|  1.1|
|  item4|         0.2|         0.0|  1.1|
 ------- ------------ ------------ ----- 

prob.select( prob("probability1") * prob("alpha")    prob("probability2").multiply( lit(1.0) - prob("alpha")), prob("item_id") ).show();
 --------------------------------------------------------- ------- 
|((probability1 * alpha)   (probability2 * (1.0 - alpha)))|item_id|
 --------------------------------------------------------- ------- 
|                                                     0.48|  item3|
|                                                     0.31|  item2|
|                                     -0.01000000000000...|  item1|
|                                      0.22000000000000003|  item4|
 --------------------------------------------------------- ------- 

CodePudding user response:

It can be solved with a full-join and simple operation. Here you have a snippet. Just bear in mind, that for clarification I changed prob names in both dataframes to prob1 and prob2 respectively. Then it's just something like:

alpha = 0.5
df = df1 \
    .join(df2, how='full', on='item_id') \
    .fillna(0) \
    .withColumn('prob', alpha * F.col('prob1')   (1-alpha) * F.col('prob2'))

Your data sample:

df1 = spark.createDataFrame(data=[
    ('item1', 0.1),
    ('item2', 0.2),
    ('item3', 0.7)
], schema=['item_id', 'prob1'])

df2 = spark.createDataFrame(data=[
    ('item2', 0.3),
    ('item3', 0.5),
    ('item4', 0.2)
], schema=['item_id', 'prob2'])
  • Related