Home > Back-end >  Compare consecutive rows and extract words(excluding the subsets) in spark
Compare consecutive rows and extract words(excluding the subsets) in spark

Time:03-23

I am working on a spark dataframe. Input dataframe looks like below (Table 1). I need to write a logic to get the keywords with maximum length for each session ids. There are multiple keywords that would be part of output for each sessionid. expected output looks like Table 2.

Input dataframe:

(Table 1)
|----------- ------------ -----------------------------------|
| session_id| value      |  Timestamp                        |
|----------- ------------ -----------------------------------|
|     1     | cat        | 2021-01-11T13:48:54.2514887-05:00 |
|     1     | catc       | 2021-01-11T13:48:54.3514887-05:00 |
|     1     | catch      | 2021-01-11T13:48:54.4514887-05:00 |
|     1     | par        | 2021-01-11T13:48:55.2514887-05:00 |
|     1     | part       | 2021-01-11T13:48:56.5514887-05:00 |
|     1     | party      | 2021-01-11T13:48:57.7514887-05:00 |
|     1     | partyy     | 2021-01-11T13:48:58.7514887-05:00 |
|     2     | fal        | 2021-01-11T13:49:54.2514887-05:00 |
|     2     | fall       | 2021-01-11T13:49:54.3514887-05:00 |
|     2     | falle      | 2021-01-11T13:49:54.4514887-05:00 |
|     2     | fallen     | 2021-01-11T13:49:54.8514887-05:00 |
|     2     | Tem        | 2021-01-11T13:49:56.5514887-05:00 |
|     2     | Temp       | 2021-01-11T13:49:56.7514887-05:00 |
|----------- ------------ -----------------------------------|

Expected Output:

 (Table 2)
|----------- ------------ 
| session_id| value      |
|----------- ------------ 
|     1     | catch      |
|     1     | partyy     |
|     2     | fallen     |
|     2     | Temp       |
|----------- ------------|

Solution I tried:

I added another column called col_length which captures the length of each word in value column. later on tried to compare each row with its subsequent row to see if it is of maximum lenth. But this solution only works party.

val df = spark.read.parquet("/project/project_name/abc")

val dfM = df.select($"session_id",$"value",$"Timestamp").withColumn("col_length",length($"value"))

val ts = Window
        .orderBy("session_id")
        .rangeBetween(Window.unboundedPreceding, Window.currentRow)

val result = dfM
            .withColumn("running_max", max("col_length") over ts)
            .where($"running_max" === $"col_length")
            .select("session_id", "value", "Timestamp")

Current Output:

|----------- ------------ 
| session_id| value      |
|----------- ------------ 
|     1     | catch      |
|     2     | fallen     |
|----------- ------------|

Multiple columns does not work inside an orderBy clause with window function so I didn't get desired output.I got 1 output per sesison id. Any suggesions would be highly appreciated. Thanks in advance.

CodePudding user response:

You can solve it by using lead function:

val windowSpec = Window.orderBy("session_id")
dfM
  .withColumn("lead",lead("value",1).over(windowSpec))
  .filter((functions.length(col("lead")) < functions.length(col("value"))) || col("lead").isNull)
  .drop("lead")
  .show
  • Related