I have a dataframe in pyspark that looks like this
------------------ --------------------
| Community_Area| Date|
------------------ --------------------
| New City|09/05/2015 01:30:...|
| Austin|09/04/2015 11:30:...|
| New City|09/05/2015 12:01:...|
| Avondale|09/05/2015 12:45:...|
| Austin|09/04/2015 01:00:...|
| Auburn Gresham|09/05/2015 10:55:...|
| West Town|09/04/2015 06:00:...|
| Avondale|09/05/2015 01:00:...|
I'm trying to add a count column so that if a community_area shows up more than once on the same day, that count will increase as below, first appearance as 1 seems like the right way.
------------------ --------------------
| Community_Area| Date| Count
------------------ --------------------
| New City|09/05/2015 01:30:...| 1
| Austin|09/04/2015 11:30:...| 1
| New City|09/05/2015 12:01:...| 2
| Avondale|09/05/2015 12:45:...| 1
| Austin|09/04/2015 01:00:...| 2
| Auburn Gresham|09/05/2015 10:55:...| 1
| West Town|09/04/2015 06:00:...| 1
| Avondale|09/05/2015 01:00:...| 2
...
The goal is to add a rolling 7-day sum column using the window function so the final table has 3 columns (Community, Date, Rolling 7-day sum).
My initial approach is to use the count column to use in the window function.
The code I've used to do that is
df4b = df4b.groupby(["Community_Area", "Date"])["Community_Area"].count().reset_index(name="count")
df4b.show()
CodePudding user response:
example
df = pd.DataFrame([list('ABACBDEC')], index=['col1']).T
output(df
):
col1
0 A
1 B
2 A
3 C
4 B
5 D
6 E
7 C
groupby
cumcount
df.groupby('col1').cumcount() 1
result:
0 1
1 1
2 2
3 1
4 2
5 1
6 1
7 2
dtype: int64
make rusult to column
CodePudding user response:
Use window to partition by desired groups and row_number() to assign serial number:
import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame(data=[["New City","09/05/2015"],["Austin","09/04/2015"],["New City","09/05/2015"],["Avondale","09/05/2015"],["Austin","09/04/2015"],["Auburn Gresham","09/05/2015"],["West Town","09/04/2015"],["Avondale","09/05/2015"]], schema=["Community_Area", "Date"])
df = df.withColumn("Date", F.to_date("Date", format="MM/dd/yyyy"))
w = Window.partitionBy("Date", "Community_Area").orderBy("Date", "Community_Area")
df = df.withColumn("Count", F.row_number().over(w))
[Out]:
-------------- ---------- -----
|Community_Area| Date|Count|
-------------- ---------- -----
| Austin|2015-09-04| 1|
| Austin|2015-09-04| 2|
| West Town|2015-09-04| 1|
|Auburn Gresham|2015-09-05| 1|
| Avondale|2015-09-05| 1|
| Avondale|2015-09-05| 2|
| New City|2015-09-05| 1|
| New City|2015-09-05| 2|
-------------- ---------- -----
UPDATE
You have asked for weekly cumulative sum as well. For that partition by year, week of the year & area and then sum over count by partition.
PS - I have changed the input data to test sum for different weeks. Since, your ask and given expected output does not match, I have made some assumptions that you need total for every week by area.
import pyspark.sql.functions as F
from pyspark.sql import Window
df = spark.createDataFrame(data=[["New City","09/03/2015"],["Austin","09/03/2015"],["New City","09/03/2015"],["Austin","09/03/2015"],["New City","09/05/2015"],["Austin","09/05/2015"],["New City","09/07/2015"],["Austin","09/07/2015"],["Austin","09/09/2015"],["New City","09/09/2015"],["Austin","09/11/2015"],["New City","09/11/2015"]], schema=["Community_Area", "Date"])
df = df.withColumn("Date", F.to_date("Date", format="MM/dd/yyyy"))
df = df.withColumn("weekofyear", F.weekofyear("Date"))
df = df.withColumn("year", F.year("Date"))
w = Window.partitionBy("Date", "weekofyear", "Community_Area").orderBy("Date", "weekofyear", "Community_Area")
df = df.withColumn("Count", F.row_number().over(w))
w2 = Window.partitionBy("year", "weekofyear", "Community_Area").orderBy("year", "weekofyear", "Community_Area")
df = df.withColumn("Rolling 7-day sum", F.sum("Count").over(w2))
[Out]:
-------------- ---------- ---------- ---- ----- -----------------
|Community_Area| Date|weekofyear|year|Count|Rolling 7-day sum|
-------------- ---------- ---------- ---- ----- -----------------
| Austin|2015-09-03| 36|2015| 1| 4|
| Austin|2015-09-03| 36|2015| 2| 4|
| Austin|2015-09-05| 36|2015| 1| 4|
| New City|2015-09-03| 36|2015| 1| 4|
| New City|2015-09-03| 36|2015| 2| 4|
| New City|2015-09-05| 36|2015| 1| 4|
| Austin|2015-09-07| 37|2015| 1| 3|
| Austin|2015-09-09| 37|2015| 1| 3|
| Austin|2015-09-11| 37|2015| 1| 3|
| New City|2015-09-07| 37|2015| 1| 3|
| New City|2015-09-09| 37|2015| 1| 3|
| New City|2015-09-11| 37|2015| 1| 3|
-------------- ---------- ---------- ---- ----- -----------------