I have a scala list as below.
partList: ListBuffer(2021-10-01, 2021-10-02, 2021-10-03, 2021-10-04, 2021-10-05, 2021-10-06, 2021-10-07, 2021-10-08)
Currently Im getting all the data from source into the dataframe based on the above dates.
fctExistingDF = ss.read.table(existingTable).filter(s"event_date in ('${partList.mkString("','")}')")
Later I'm doing few transformations and loading the data into a delta table. The sample code is as below.
fctDF = ss.read.table(existingTable).filter(s"event_date in ('${partList.mkString("','")}')")
if (fctExistingDF.count() > 0) {
fctDF.createOrReplaceTempView("vw_exist_fct")
val existingRecordsQuery = getExistingRecordsMergeQuery(azUpdateTS,key)
ss.sql(existingRecordsQuery)
.drop("az_insert_ts").drop("az_update_ts")
.withColumn("az_insert_ts", col("new_az_insert_ts"))
.withColumn("az_update_ts", col("new_az_update_ts"))
.drop("new_az_insert_ts").drop("new_az_update_ts")
.select(mrg_tbl_cols(0), mrg_tbl_cols.slice(1,mrg_tbl_cols.length): _*)
.coalesce(72*2)
.write.mode("Append").format("delta")
.insertInto(mergeTable)
mergedDataDF = ss.read.table(mergeTable).coalesce(72*2)
mergedDataDF.coalesce(72)
.write.mode("Overwrite").format("delta")
.insertInto(s"${tgtSchema}.${tgtTbl}")
The below command in the code is creating a dataframe based on the filter condition on the event_date present in the partList.
fctExistingDF = ss.read.table(existingTable).filter(s"event_date in ('${partList.mkString("','")}')")
Since it is creating the dataframe with huge data, I want to loop each date in the partlist and read the data into the dataframe, instead of filtering all the dates in the partlist at a time.
I tried below.
var counter = 0
while (counter < partList.length) {
fctExistingDF = ss.read.table(existingTable).filter(s"event_date in (I should pass 1st date from the list)
counter = counter 1
I am new to scala , may be we should use foreach here? Could someone please help. Thank you.
CodePudding user response:
You can use foreach
or map
, depends whether you want to return the values (map
) or not (foreach
):
import org.apache.spark.sql.functions.col
partList = List("2021-10-01", "2021-10-02", "2021-10-03", "2021-10-04", "2021-10-05", "2021-10-06", "2021-10-07", "2021-10-08")
partList.foreach { case date =>
fctExistingDF = ss.read.table(existingTable).filter(col("event_date") === date)
}
If you want to return list of dataframes, use:
val dfs = partList.map { case date =>
fctExistingDF = ss.read.table(existingTable).filter(col("event_date") === date)