So I got an input pysaprk dataframe that looks like the following:
df = spark.createDataFrame(
[("1111", "clark"),
("1111", "john"),
("2222", "bob"),
("3333", "jane"),
("3333", "lucie"),
("3333", "matt")
],
["column1", "column2"]
)
| column1 | column2 |
| ------- | ------- |
| 1111 | clark |
| 1111 | john |
| 2222 | bob |
| 3333 | jane |
| 3333 | lucie |
| 3333 | matt |
And my goal is to create an incremental id, but that will increment per group of value from the column1
in this case. So I get something like:
df_out = spark.createDataFrame(
[("1111", "clark", 1),
("1111", "john", 2),
("2222", "bob", 1),
("3333", "jane", 1),
("3333", "lucie", 2),
("3333", "matt", 3)
],
["column1", "column2", "incremental_id"]
)
| column1 | column2 | incremental_id |
| ------- | ------- | -------------- |
| 1111 | clark | 1 |
| 1111 | john | 2 |
| 2222 | bob | 1 |
| 3333 | jane | 1 |
| 3333 | lucie | 2 |
| 3333 | matt | 3 |
I tried using the window function as follow, but didn't get me the incremental_id values as I was hoping for per group of values from the column1
column.
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
w = Window().orderBy("column1")
df_out = df.withColumn("incremental_id", row_number().over(w))
CodePudding user response:
The only thing missing from your code is to specify the partition columns. In your example, it would look something like
w = Window.partitionBy('column2').orderBy("column1")
Everything else looks fine!