I have a dataframe as shown below.
| ID | date | sig01_diff |
----- ------------ ------------
| 123 | 2019-11-04 | 93668 |
| 123 | 2019-11-05 | 49350 |
| 123 | 2019-11-07 | null |
| 123 | 2019-11-08 | 11069 |
| 123 | 2019-11-09 | 33203 |
| 123 | 2019-11-11 | 47927 |
| 123 | 2020-01-21 | null |
| 123 | 2020-01-22 | null |
| 123 | 2020-01-23 | 33908 |
| 123 | 2020-01-24 | 61603 |
| 123 | 2020-01-27 | 33613 |
| 123 | 2020-01-28 | 27514 |
| 123 | 2020-01-29 | null |
| 123 | 2020-01-30 | null |
| 123 | 2020-02-11 | null |
| 123 | 2020-02-12 | null |
| 123 | 2020-02-13 | null |
| 123 | 2020-02-14 | null |
| 123 | 2020-02-15 | 65625 |
| 123 | 2020-02-17 | 13354 |
| 123 | 2020-02-18 | null |
| 123 | 2020-02-19 | 69069 |
----- ------------ ------------
I have to get number of null record preceding to a record as shown below.
| ID | date | sig01_diff |null_count |
----- ------------ ------------ -----------
| 123 | 2019-11-04 | 93668 | 00 |
| 123 | 2019-11-05 | 49350 | 00 |
| 123 | 2019-11-07 | null | 00 |
| 123 | 2019-11-08 | 11069 | 01 |
| 123 | 2019-11-09 | 33203 | 00 |
| 123 | 2019-11-11 | 47927 | 00 |
| 123 | 2020-01-21 | null | 00 |
| 123 | 2020-01-22 | null | 00 |
| 123 | 2020-01-23 | 33908 | 02 |
| 123 | 2020-01-24 | 61603 | 00 |
| 123 | 2020-01-27 | 33613 | 00 |
| 123 | 2020-01-28 | 27514 | 00 |
| 123 | 2020-01-29 | null | 00 |
| 123 | 2020-01-30 | null | 00 |
| 123 | 2020-02-11 | null | 00 |
| 123 | 2020-02-12 | null | 00 |
| 123 | 2020-02-13 | null | 00 |
| 123 | 2020-02-14 | null | 00 |
| 123 | 2020-02-15 | 65625 | 06 |
| 123 | 2020-02-17 | 13354 | 00 |
| 123 | 2020-02-18 | null | 00 |
| 123 | 2020-02-19 | 69069 | 01 |
----- ------------ ------------ -----------
As shown above the new column will have a count of null records preceding to that record. for example for below dates:
2019-11-08
2020-02-15
Using window function and unboundpreceding, I am able to find count of null records incrementally within a window. But my requirement is within a window the count of null records between two non-null records.
How could I achieve this ? Any leads appreciated!
CodePudding user response:
You should use window function with rowsBetween
defined with Window.currentRow - 1
as the upper bound. Here is an example:
import org.apache.spark.sql.functions.{count, when, col}
import org.apache.spark.sql.expressions.Window
import spark.implicits._
val df = List((1, Some(1)), (2, None), (3, Some(3)), (4, None), (5, Some(4))).toDF("id", "nullable_value")
val w = Window
.orderBy(col("id"))
.rowsBetween(Window.unboundedPreceding, Window.currentRow - 1)
df.withColumn("not_null_count", count(when(col("nullable_value").isNull, 1)).over(w)).show()
--- -------------- --------------
| id|nullable_value|not_null_count|
--- -------------- --------------
| 1| 1| 0|
| 2| null| 0|
| 3| 3| 1|
| 4| null| 1|
| 5| 4| 2|
--- -------------- --------------
CodePudding user response:
It seems like there are no direct functions to find out number of null records between two non-null records.
Below provided a working solution for this question.
%scala
import spark.implicits._
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.expressions.Window
val dfTest = Seq(
(1,1,10,1),
(1,2,10,2),
(1,3,0,1),
(1,4,20,1),
(1,5,0,1),
(1,6,0,1),
(1,7,60,0),
(1,8,0,2),
(1,9,0,1),
(1,10,0,1),
(1,11,80,1),
(1,7,60,1),
(2,1,10,1),
(2,2,10,2),
(2,3,0,1),
(2,4,20,0),
(2,5,0,0),
(2,6,0,1),
(2,7,60,1)
).toDF("ID","date","A","B")
val test = dfTest.withColumn("A", when((col("A") === 0),null).otherwise(col("A")))
.withColumn("B", when((col("B") === 0),null).otherwise(col("B")))
The Input dataframe is as shown below.
--- ---- ---- ----
| ID|date| A| B|
--- ---- ---- ----
| 1| 1| 10| 1|
| 1| 2| 10| 2|
| 1| 3|null| 1|
| 1| 4| 20| 1|
| 1| 5|null| 1|
| 1| 6|null| 1|
| 1| 7| 60|null|
| 1| 8|null| 2|
| 1| 9|null| 1|
| 1| 10|null| 1|
| 1| 11| 80| 1|
| 1| 7| 60| 1|
| 2| 1| 10| 1|
| 2| 2| 10| 2|
| 2| 3|null| 1|
| 2| 4| 20|null|
| 2| 5|null|null|
| 2| 6|null| 1|
| 2| 7| 60| 1|
--- ---- ---- ----
The solution is provided as below.
val w2 = Window.partitionBy("ID").orderBy("date")
val w3 = Window.partitionBy("ID").orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow-1)
val newDf = test
.withColumn("A_cnt", count(when(col("A").isNull, 1)).over(w3))
.withColumn("B_cnt", count(when(col("B").isNull, 1)).over(w3))
.withColumn("A_null", when(col("A").isNotNull && lag(col("A"),1).over(w2).isNull, col("A_cnt")).otherwise(null))
.withColumn("B_null", when(col("B").isNotNull && lag(col("B"),1).over(w2).isNull, col("B_cnt")).otherwise(null))
.withColumn("A_null_cnt", when(col("A").isNotNull && lag(col("A"),1).over(w2).isNull, col("A_null") - last("A_null", true).over(w3)).otherwise(null))
.withColumn("B_null_cnt", when(col("B").isNotNull && lag(col("B"),1).over(w2).isNull, col("B_null") - last("B_null", true).over(w3)).otherwise(null))
.drop("A_cnt")
.drop("B_cnt")
.drop("A_null")
.drop("B_null")
And the output will be as shown below.
--- ---- ---- ---- ------------ ------------
| ID|date| A| B| A_null_cnt| B_null_cnt|
--- ---- ---- ---- ------------ ------------
| 1| 1| 10| 1| null| null|
| 1| 2| 10| 2| null| null|
| 1| 3|null| 1| null| null|
| 1| 4| 20| 1| 1| null|
| 1| 5|null| 1| null| null|
| 1| 6|null| 1| null| null|
| 1| 7| 60|null| 2| null|
| 1| 7| 60| 1| null| 1|
| 1| 8|null| 2| null| null|
| 1| 9|null| 1| null| null|
| 1| 10|null| 1| null| null|
| 1| 11| 80| 1| 3| null|
| 2| 1| 10| 1| null| null|
| 2| 2| 10| 2| null| null|
| 2| 3|null| 1| null| null|
| 2| 4| 20|null| 1| null|
| 2| 5|null|null| null| null|
| 2| 6|null| 1| null| 2|
| 2| 7| 60| 1| 2| null|
--- ---- ---- ---- ------------ ------------