Home > OS >  Looping the scala list in Spark
Looping the scala list in Spark

Time:10-28

I have a scala list as below.

partList: ListBuffer(2021-10-01, 2021-10-02, 2021-10-03, 2021-10-04, 2021-10-05, 2021-10-06, 2021-10-07, 2021-10-08)

Currently Im getting all the data from source into the dataframe based on the above dates.

fctExistingDF = ss.read.table(existingTable).filter(s"event_date in ('${partList.mkString("','")}')")

Later I'm doing few transformations and loading the data into a delta table. The sample code is as below.

fctDF = ss.read.table(existingTable).filter(s"event_date in ('${partList.mkString("','")}')")
    if (fctExistingDF.count() > 0) {
fctDF.createOrReplaceTempView("vw_exist_fct")
val existingRecordsQuery = getExistingRecordsMergeQuery(azUpdateTS,key)
ss.sql(existingRecordsQuery)
.drop("az_insert_ts").drop("az_update_ts")
.withColumn("az_insert_ts", col("new_az_insert_ts"))
.withColumn("az_update_ts", col("new_az_update_ts"))
.drop("new_az_insert_ts").drop("new_az_update_ts")
.select(mrg_tbl_cols(0), mrg_tbl_cols.slice(1,mrg_tbl_cols.length): _*)
.coalesce(72*2)
.write.mode("Append").format("delta")
.insertInto(mergeTable)
mergedDataDF = ss.read.table(mergeTable).coalesce(72*2)

mergedDataDF.coalesce(72)
      .write.mode("Overwrite").format("delta")
      .insertInto(s"${tgtSchema}.${tgtTbl}")
      

The below command in the code is creating a dataframe based on the filter condition on the event_date present in the partList.

fctExistingDF = ss.read.table(existingTable).filter(s"event_date in ('${partList.mkString("','")}')")

Since it is creating the dataframe with huge data, I want to loop each date in the partlist and read the data into the dataframe, instead of filtering all the dates in the partlist at a time.

I tried below.

var counter = 0

while (counter < partList.length) {
  
  fctExistingDF = ss.read.table(existingTable).filter(s"event_date in (I should pass 1st date from the list)
 counter = counter   1
 

I am new to scala , may be we should use foreach here? Could someone please help. Thank you.

CodePudding user response:

You can use foreach or map, depends whether you want to return the values (map) or not (foreach):

import org.apache.spark.sql.functions.col

partList = List("2021-10-01", "2021-10-02", "2021-10-03", "2021-10-04", "2021-10-05", "2021-10-06", "2021-10-07", "2021-10-08")

partList.foreach { case date =>
   fctExistingDF = ss.read.table(existingTable).filter(col("event_date") === date) 
}

If you want to return list of dataframes, use:

val dfs = partList.map { case date =>
   fctExistingDF = ss.read.table(existingTable).filter(col("event_date") === date) 
  • Related