Home > Back-end >  Spark Scala : Add new column without looping throw same table many times
Spark Scala : Add new column without looping throw same table many times

Time:09-02

I want to add a new column called activeIOsAtSite that has a value of 'Y' or 'N'.

the value will be added according to the following condition:

For each column SITE_SITE_ID in row of the table bellow; if the SITE_SITE_ID has:

(APPLICATION_SOURCE == 'CIBASE' AND STANDARD_STATUS =='ACTIVE')

In this case the value should be 'Y' otherwise 'N'

IS there a was to do that without iterate on the same table over and over again (for each row), because the table I have is so big and I need to do that in the fastest way possible?

Example of desired result:

enter image description here

I tried to do something like that, but I'm not sure if it is correct:

finvInventoryAllDf
  .withColumn(
    "activeIOsAtSite",
    activeIOsAtSiteGenerator(finvInventoryAllDf, col("Site_siteId"))
  )

with activeIOsAtSiteGenerator() is a function where I verify the conditions above:

  def activeIOsAtSiteGenerator(dataFrame: DataFrame, site_siteId: Column): Column = {
    val count = dataFrame
      .where(col("Site_siteId") === site_siteId)
      .where("InstalledOffer_installedOfferId IS NOT NULL AND InstalledOffer_installedOfferId NOT IN ('','null','NULL') AND UPPER(InstalledOffer_standardStatus) IN ('ACTIVE') AND UPPER(InstalledOffer_applicationSource) IN('CIBASE')")
      .count()
    if (count > 0)
      lit("Y")
    else
      lit("N")
  }

CodePudding user response:

You can first groupBy the unique ID, then collect_set to check whether the column contains any of the combo you mentioned.

var grouped = df
  .groupBy("SITE_SITE_ID").agg(collect_set(array("APPLICATION_SOURCE", "STANDARD_STATUS")).as("array"))
  .withColumn("indicator",
    expr("transform(array, x -> array_contains(x, 'CIBASE') and array_contains(x, 'ACTIVE'))")
  )

In case order matters:

.withColumn("indicator",
    expr("transform(array, x -> lower(element_at(x, 1)) = 'cibase' and lower(element_at(x, 2)) = 'active')")
)

Current form of what we have:

 ------------ ------------------------------------ ------------- 
|SITE_SITE_ID|array                               |indicator    |
 ------------ ------------------------------------ ------------- 
|si_2        |[[SLOW, STASH]]                     |[false]      | <- make this N
|si_3        |[[MEDIUM, TREE]]                    |[false]      | <- make this N
|si_1        |[[FAST, DISABLED], [CIBASE, ACTIVE]]|[false, true]| <- make this Y (pair found)
 ------------ ------------------------------------ ------------- 

Then we move on:

  grouped = grouped.withColumn("indicator",
    when(array_contains(col("indicator"), true), "Y").otherwise("N")
  )
  .drop("array")
 ------------ --------- 
|SITE_SITE_ID|indicator|
 ------------ --------- 
|si_2        |N        |
|si_3        |N        |
|si_1        |Y        |
 ------------ --------- 

The collected_set returns an array of arrays, that is why we check for the combo, and we check again, if there is one true within the array (the combo has been found), return Y otherwise N; finally, we drop array column.

Grouped's sample:

 ------------ --------- 
|SITE_SITE_ID|indicator|
 ------------ --------- 
|si_2        |N        |
|si_3        |N        |
|si_1        |Y        |
 ------------ --------- 

Finally, we join our main table with grouped:

df.join(grouped, Seq("SITE_SITE_ID"))

Final result:

 ------------ ----- ------------------ --------------- --------- 
|SITE_SITE_ID|IR_ID|APPLICATION_SOURCE|STANDARD_STATUS|indicator|
 ------------ ----- ------------------ --------------- --------- 
|si_2        |ir2  |SLOW              |STASH          |N        |
|si_3        |ir3  |MEDIUM            |TREE           |N        |
|si_1        |ir1  |FAST              |DISABLED       |Y        |
|si_1        |ir4  |CIBASE            |ACTIVE         |Y        |
 ------------ ----- ------------------ --------------- --------- 

Good luck!

  • Related