Sl Now
1 D
2 D
3 D
4 R
5 R
6 C
7 C
8 C
9 D
10 P
11 R
12 R
13 D
I have a dataset like above.
Sl Now lead
1 D R
2 D R
3 D R
4 R C
5 R C
6 C D
7 C D
8 C D
9 D P
10 P R
11 R D
12 R D
13 D
I want to add a column called "lead" that will display the subsequent value of the "Now" column and will match the number of values in the 'Now' column. Can we carry this out with PySpark?
CodePudding user response:
Here is how I would do it.
Prep data
a = "DDDRRCCCDPRRD"
a = list(zip(range(len(a)), a))
b = ["Sl","Now"]
df = spark.createDataFrame(a,b)
df.show()
--- ---
| Sl|now|
--- ---
| 0| D|
| 1| D|
| 2| D|
| 3| R|
| 4| R|
| 5| C|
| 6| C|
| 7| C|
| 8| D|
| 9| P|
| 10| R|
| 11| R|
| 12| D|
--- ---
Imports
from pyspark.sql import functions as F
from pyspark.sql import Window as W
add an incremental ID
df = df.withColumn("id", F.when(F.col("now") == F.lag("Now").over(W.orderBy("Sl")), 0).otherwise(1))
df = df.withColumn("id", F.sum("id").over(W.orderBy("Sl")))
df.show()
--- --- ---
| Sl|now| id|
--- --- ---
| 0| D| 1|
| 1| D| 1|
| 2| D| 1|
| 3| R| 2|
| 4| R| 2|
| 5| C| 3|
| 6| C| 3|
| 7| C| 3|
| 8| D| 4|
| 9| P| 5|
| 10| R| 6|
| 11| R| 6|
| 12| D| 7|
--- --- ---
Get the lead value (method 1)
df = df.withColumn("lead", F.collect_set("now").over(W.orderBy("id").rangeBetween(1, 1)))
df = df.select("Sl", "now", F.explode_outer("lead").alias("lead"))
df.show()
--- --- ----
| Sl|now|lead|
--- --- ----
| 0| D| R|
| 1| D| R|
| 2| D| R|
| 3| R| C|
| 4| R| C|
| 5| C| D|
| 6| C| D|
| 7| C| D|
| 8| D| P|
| 9| P| R|
| 10| R| D|
| 11| R| D|
| 12| D|null|
--- --- ----
Get the lead value (method 2)
df2 = df.select(F.col("now").alias("lead"), (F.col("id") - 1).alias("id")).distinct()
df1 = df.join(df2, how="left", on="id")
df1.select("Sl", "now", "lead").show()
--- --- ----
| Sl|now|lead|
--- --- ----
| 0| D| R|
| 1| D| R|
| 2| D| R|
| 3| R| C|
| 4| R| C|
| 5| C| D|
| 6| C| D|
| 7| C| D|
| 8| D| P|
| 9| P| R|
| 10| R| D|
| 11| R| D|
| 12| D|null|
--- --- ----