My data will look like below
Journey Table
SERNR | TYPE |
---|---|
123 | null |
456 | null |
789 | null |
Segment Table
SERNR | Sgmnt | FROM-Station | TO-Station |
---|---|---|---|
123 | 01 | A | B |
123 | 02 | B | C |
123 | 03 | C | B |
123 | 04 | B | A |
456 | 01 | A | B |
456 | 02 | B | C |
456 | 01 | C | D |
456 | 01 | D | A |
789 | 04 | A | B |
I want to join these two data frames/tables and have check on the journey station FROM
and TO
to decide a journey type, i.e if its return journey some type A
if its mirror return some type B
, if its a one-way journey some type C
type calculation will be as follows
lets say for journey SERNR 123, the journey details are A->B , B->C, C->B,B->A, this is a mirror journey, because its A-B- C then C-B- A.
for 789 its A->B so its a normal journey .
for 456 its A-> B, B->C , C->D , D-A, in short A-B-C then C-D-A , this is a return but not a mirror
I really don't know how to do a comparison of rows in Dataframe based on SERNR
to decide the type by checking FROM
and To
station of the same SERNR
Really appreciate if I can get a pointer to go ahead and implement the same.
CodePudding user response:
Use cllect_list of from_ station or to_station by grouping it with SERNR and order with segment
CodePudding user response:
You can collect the list of FROM
TO
journeys into an array column for each SERNR
, then join the array elements to get a journey_path
(A-B-C...
).
When you get the journey path for each journey, you can use when
expression to determine the TYPE
:
- If first FROM != last TO then it's
normal
- else : if the reverse of the journey_path == the journey_path the
mirror
otherwise it's areturn
Note that you need to use a Window to keep the order of the segment when grouping and collecting the list of FROM - TO
s.
import org.apache.spark.sql.expressions.Window
val w = Window.partitionBy("SERNR").orderBy("Sgmnt").rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
val result = segment_df.select(
col("SERNR"),
array_join(
collect_list(concat_ws("-", col("FROM"), col("TO"))).over(w),
"-"
).alias("journey_path")
).dropDuplicates(Seq("SERNR")).withColumn(
"TYPE",
when(
substring(col("journey_path"), 0, 1) =!= substring(col("journey_path"), -1, 1),
"normal"
).otherwise(
when(
reverse(col("journey_path")) === col("journey_path"),
"mirror"
).otherwise("return")
)
)
.drop("journey_path")
result.show
// ----- ------
//|SERNR| TYPE|
// ----- ------
//| 789|normal|
//| 456|return|
//| 123|mirror|
// ----- ------