Home > Mobile >  Compare rows in a dataframe in spark, to assign value to a column based on comparison of rows
Compare rows in a dataframe in spark, to assign value to a column based on comparison of rows

Time:12-18

My data will look like below

Journey Table

SERNR TYPE
123 null
456 null
789 null

Segment Table

SERNR Sgmnt FROM-Station TO-Station
123 01 A B
123 02 B C
123 03 C B
123 04 B A
456 01 A B
456 02 B C
456 01 C D
456 01 D A
789 04 A B

I want to join these two data frames/tables and have check on the journey station FROM and TO to decide a journey type, i.e if its return journey some type A if its mirror return some type B, if its a one-way journey some type C

type calculation will be as follows

lets say for journey SERNR 123, the journey details are A->B , B->C, C->B,B->A, this is a mirror journey, because its A-B- C then C-B- A.

for 789 its A->B so its a normal journey .

for 456 its A-> B, B->C , C->D , D-A, in short A-B-C then C-D-A , this is a return but not a mirror

I really don't know how to do a comparison of rows in Dataframe based on SERNR to decide the type by checking FROM and To station of the same SERNR

Really appreciate if I can get a pointer to go ahead and implement the same.

CodePudding user response:

Use cllect_list of from_ station or to_station by grouping it with SERNR and order with segment

CodePudding user response:

You can collect the list of FROM TO journeys into an array column for each SERNR, then join the array elements to get a journey_path (A-B-C...).

When you get the journey path for each journey, you can use when expression to determine the TYPE:

  • If first FROM != last TO then it's normal
  • else : if the reverse of the journey_path == the journey_path the mirror otherwise it's a return

Note that you need to use a Window to keep the order of the segment when grouping and collecting the list of FROM - TOs.

import org.apache.spark.sql.expressions.Window

val w = Window.partitionBy("SERNR").orderBy("Sgmnt").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

val result = segment_df.select(
    col("SERNR"),
    array_join(
      collect_list(concat_ws("-", col("FROM"), col("TO"))).over(w),
      "-"
    ).alias("journey_path")
  ).dropDuplicates(Seq("SERNR")).withColumn(
    "TYPE",
    when(
      substring(col("journey_path"), 0, 1) =!= substring(col("journey_path"), -1, 1),
      "normal"
    ).otherwise(
      when(
        reverse(col("journey_path")) === col("journey_path"),
        "mirror"
      ).otherwise("return")
    )
  )
  .drop("journey_path")

result.show
// ----- ------ 
//|SERNR|  TYPE|
// ----- ------ 
//|  789|normal|
//|  456|return|
//|  123|mirror|
// ----- ------ 
  • Related