Home > Enterprise >  Update rows of dataframe according to the content of a map
Update rows of dataframe according to the content of a map

Time:09-17

I have this dataframe :

 ------ ---------- ----------- 
|brand |Timestamp |Weight     |
 ------ ---------- ----------- 
|BR1   |1632899456|4.0        |
|BR1   |1632901256|4.0        |
|BR300 |1632901796|2.0        | 
|BR300 |1632899155|2.0        |
|BR200 |1632899155|2.0        |

And this map :

val map : Map[String, Double] = ("BR1" -> 70.0, "BR300" -> 90.0 )

I would like to update the column "Weight" according to the content in the map.

The purpose is to sum the value in the row with the value in the map.

The output should look like this :

 ------ ---------- ----------- 
|brand |Timestamp |Weight     |
 ------ ---------- ----------- 
|BR1   |1632899456|74.0       |
|BR1   |1632901256|74.0       |
|BR300 |1632901796|92.0       | 
|BR300 |1632899155|92.0       |
|BR200 |1632899155|2.0        |

I am using Spark version 3.0.2 and SQLContext, with scala language.

CodePudding user response:

The map can be translated into an SQL statement. This avoids using an UDF and therefore might improve performance.

val df = ...
val map : Map[String, Double] = Map("BR1" -> 70.0, "BR300" -> 90.0 )

val sql=map.foldLeft("Weight   case brand ")((a, b) => s"$a when '${b._1}' then ${b._2}")   " else 0.0 end"
df.withColumn("Weight", expr(sql)).show()

The generated sql string is

Weight   case brand  when 'BR1' then 70.0 when 'BR300' then 90.0 else 0.0 end

Output:

 ----- ---------- ------ 
|brand| Timestamp|Weight|
 ----- ---------- ------ 
|  BR1|1632899456|  74.0|
|  BR1|1632901256|  74.0|
|BR300|1632901796|  92.0|
|BR300|1632899155|  92.0|
|BR200|1632899155|   2.0|
 ----- ---------- ------ 

CodePudding user response:

You can use UDF to get the value from the map then do sum with column value.

val spark = SparkSession.builder().master("local[*]").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

import org.apache.spark.sql.functions._
import spark.implicits._


val df = Seq(("BR1", 1632901256, 4.0),
("BR300", 1632901796, 2.0),
("BR200", 1632899155, 2.0)).toDF("brand", "Timestamp", "Weight")

val map: Map[String, Double] = Map("BR1" -> 70.0, "BR300" -> 90.0)

val broadcastedMap = spark.sparkContext.broadcast(map)

val getvalueFromMap = udf((s: String) => broadcastedMap.value.getOrElse(s, 0.0))

df.withColumn("Weight", getvalueFromMap('brand)   'Weight).show()

 /*
 ----- ---------- ------ 
|brand| Timestamp|Weight|
 ----- ---------- ------ 
|  BR1|1632901256|  74.0|
|BR300|1632901796|  92.0|
|BR200|1632899155|   2.0|
 ----- ---------- ------ */
  • Related