I'm using monotonically_increasing_id()
to count the number of passes in a row. That said, I want it to start back at zero whenever it hits a FAIL
.
df.withColumn("COLUMN_2", when(col("COLUMN_2")=="PASS", monotonically_increasing_id()).otherwise(0)).show()
Current output:
------------ ------------
|COLUMN_2 |COLUMN_3 |
------------ ------------
| PASS| 0|
| PASS| 1|
| PASS| 2|
| PASS| 3|
| PASS| 4|
| PASS| 5|
| PASS| 6|
| PASS| 7|
| PASS| 8|
| PASS| 9|
| PASS| 10|
| PASS| 11|
| FAIL| 0|
| PASS| 12|
| PASS| 13|
| PASS| 14|
| PASS| 15|
| PASS| 16|
| PASS| 17|
| PASS| 18|
------------ ------------
Desired output:
------------ ------------
|COLUMN_2 |COLUMN_3 |
------------ ------------
| PASS| 0|
| PASS| 1|
| PASS| 2|
| PASS| 3|
| PASS| 4|
| PASS| 5|
| PASS| 6|
| PASS| 7|
| PASS| 8|
| PASS| 9|
| PASS| 10|
| PASS| 11|
| FAIL| 0|
| PASS| 1|
| PASS| 2|
| PASS| 3|
| PASS| 4|
| PASS| 5|
| PASS| 6|
| PASS| 7|
------------ ------------
If I were to write what I'm looking to do in vanilla python, here is what it would look like:
count = 0
new_column = []
for i in range(len(df)):
if df["COLUMN_2"][i] == "PASS":
count = 1
new_column.append(count)
if df["COLUMN_2"][i] == "FAIL":
count = 0
new_column.append(count)
df["COLUMN_3"] = new_column
CodePudding user response:
If you had the order column, you could do what you want in the following approach using window functions.
Input:
from pyspark.sql import functions as F, Window as W
df = spark.createDataFrame(
[(101, 'PASS'),
(102, 'PASS'),
(103, 'PASS'),
(104, 'PASS'),
(105, 'PASS'),
(106, 'PASS'),
(107, 'PASS'),
(108, 'PASS'),
(109, 'PASS'),
(110, 'PASS'),
(111, 'PASS'),
(112, 'PASS'),
(113, 'FAIL'),
(114, 'PASS'),
(115, 'PASS'),
(116, 'PASS'),
(117, 'PASS'),
(118, 'PASS'),
(119, 'PASS'),
(120, 'PASS')],
['time', 'COLUMN_2'])
Script:
w1 = W.orderBy('time')
w2 = W.partitionBy('group2').orderBy('time')
df = df.withColumn('group1', F.when((F.row_number().over(w1) == 1) | (F.col('COLUMN_2') == 'FAIL'), 1).otherwise(0))
df = df.withColumn('group2', F.sum('group1').over(w1))
df = df.withColumn('COLUMN_3', F.row_number().over(w2) - 1)
df = df.drop('group1', 'group2')
df.show()
# ---- -------- --------
# |time|COLUMN_2|COLUMN_3|
# ---- -------- --------
# | 101| PASS| 0|
# | 102| PASS| 1|
# | 103| PASS| 2|
# | 104| PASS| 3|
# | 105| PASS| 4|
# | 106| PASS| 5|
# | 107| PASS| 6|
# | 108| PASS| 7|
# | 109| PASS| 8|
# | 110| PASS| 9|
# | 111| PASS| 10|
# | 112| PASS| 11|
# | 113| FAIL| 0|
# | 114| PASS| 1|
# | 115| PASS| 2|
# | 116| PASS| 3|
# | 117| PASS| 4|
# | 118| PASS| 5|
# | 119| PASS| 6|
# | 120| PASS| 7|
# ---- -------- --------
CodePudding user response:
ZygD raises a really good point, without a guranteed order you could run into issues. He suggests time, and I strongly believe that if you can't gurantee order bad things will happen on insert. I used ZygD's data set, and warning, I did not use a sort order that's dependable. If you insert new values the result would become unpredictable.
df.withColumn( "id", monotonically_increasing_id()
)\
.sort(col("id"))\
.withColumn( 'interim',
when(
col("COLUMN_2") == 'FAIL',
col('id')).otherwise(lit('0'))
)\ #makes a column Grouping
.withColumn( 'subtract', sum('interim').over(w1).cast("long") )\#special sauce that helps reset the value with each fail.
.withColumn("id",
col("id") - col("subtract")
).drop("interim","subtract")\
.show()
---- -------- -----------
|time|COLUMN_2| id|
---- -------- -----------
| 101| PASS| 0|
| 102| PASS| 1|
| 103| PASS| 8589934592|
| 104| PASS| 8589934593|
| 105| PASS|17179869184|
| 106| PASS|17179869185|
| 107| PASS|25769803776|
| 108| PASS|25769803777|
| 109| PASS|25769803778|
| 110| PASS|25769803779|
| 111| PASS|34359738368|
| 112| PASS|34359738369|
| 113| FAIL| 0|
| 114| PASS| 1|
| 115| PASS| 8589934592|
| 116| PASS| 8589934593|
| 117| PASS|17179869184|
| 118| PASS|17179869185|
| 119| PASS|17179869186|
| 120| PASS|17179869187|
---- -------- -----------
explain plan:
== Physical Plan ==
*(4) Project [time#0L, COLUMN_2#1, (id#526L - cast(_we0#538 as bigint)) AS id#544L]
- Window [sum(_w0#537) windowspecdefinition(id#526L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS _we0#538], [id#526L ASC NULLS FIRST]
- *(3) Sort [id#526L ASC NULLS FIRST], false, 0
- Exchange SinglePartition
- *(2) Project [time#0L, COLUMN_2#1, id#526L, cast(CASE WHEN (COLUMN_2#1 = FAIL) THEN cast(id#526L as string) ELSE 0 END as double) AS _w0#537]
- *(2) Sort [id#526L ASC NULLS FIRST], true, 0
- Exchange rangepartitioning(id#526L ASC NULLS FIRST, 200)
- *(1) Project [time#0L, COLUMN_2#1, monotonically_increasing_id() AS id#526L]
- Scan ExistingRDD[time#0L,COLUMN_2#1]
Answer provided by ZygD explain:
== Physical Plan ==
*(4) Project [time#0L, COLUMN_2#1, (_we0#567 - 1) AS COLUMN_3#566]
- Window [row_number() windowspecdefinition(group2#560L, time#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#567], [group2#560L], [time#0L ASC NULLS FIRST]
- *(3) Sort [group2#560L ASC NULLS FIRST, time#0L ASC NULLS FIRST], false, 0
- *(3) Project [time#0L, COLUMN_2#1, group2#560L]
- Window [sum(cast(group1#554 as bigint)) windowspecdefinition(time#0L ASC NULLS FIRST, specifiedwindowframe(RangeFrame, unboundedpreceding$(), currentrow$())) AS group2#560L], [time#0L ASC NULLS FIRST]
- *(2) Project [time#0L, COLUMN_2#1, CASE WHEN ((_we0#555 = 1) || (COLUMN_2#1 = FAIL)) THEN 1 ELSE 0 END AS group1#554]
- Window [row_number() windowspecdefinition(time#0L ASC NULLS FIRST, specifiedwindowframe(RowFrame, unboundedpreceding$(), currentrow$())) AS _we0#555], [time#0L ASC NULLS FIRST]
- *(1) Sort [time#0L ASC NULLS FIRST], false, 0
- Exchange SinglePartition
- Scan ExistingRDD[time#0L,COLUMN_2#1]
So depending on the data ZygD may be faster, as my solutions has more exchanges (but the same amount of shuffles) just depends on what you need, your data and what's more readable.