Home > Enterprise >  Add new column Pyspark that contains unique count
Add new column Pyspark that contains unique count

Time:11-13

I have a dataframe in pyspark that looks like this

    ------------------ -------------------- 
|    Community_Area|                Date|
 ------------------ -------------------- 
|          New City|09/05/2015 01:30:...|
|            Austin|09/04/2015 11:30:...|
|          New City|09/05/2015 12:01:...|
|          Avondale|09/05/2015 12:45:...|
|            Austin|09/04/2015 01:00:...|
|    Auburn Gresham|09/05/2015 10:55:...|
|         West Town|09/04/2015 06:00:...|
|          Avondale|09/05/2015 01:00:...|

I'm trying to add a count column so that if a community_area shows up more than once on the same day, that count will increase as below, first appearance as 1 seems like the right way.

  ------------------ -------------------- 
    |    Community_Area|                Date|    Count
     ------------------ -------------------- 
    |          New City|09/05/2015 01:30:...|       1
    |            Austin|09/04/2015 11:30:...|       1
    |          New City|09/05/2015 12:01:...|       2
    |          Avondale|09/05/2015 12:45:...|       1
    |            Austin|09/04/2015 01:00:...|       2
    |    Auburn Gresham|09/05/2015 10:55:...|       1
    |         West Town|09/04/2015 06:00:...|       1
    |          Avondale|09/05/2015 01:00:...|       2

...

The goal is to add a rolling 7-day sum column using the window function so the final table has 3 columns (Community, Date, Rolling 7-day sum).

My initial approach is to use the count column to use in the window function.

The code I've used to do that is

df4b = df4b.groupby(["Community_Area", "Date"])["Community_Area"].count().reset_index(name="count")
df4b.show()

CodePudding user response:

example

df = pd.DataFrame([list('ABACBDEC')], index=['col1']).T

output(df):

    col1
0   A
1   B
2   A
3   C
4   B
5   D
6   E
7   C

groupby cumcount

df.groupby('col1').cumcount()   1

result:

0    1
1    1
2    2
3    1
4    2
5    1
6    1
7    2
dtype: int64

make rusult to column

CodePudding user response:

Use window to partition by desired groups and row_number() to assign serial number:

import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame(data=[["New City","09/05/2015"],["Austin","09/04/2015"],["New City","09/05/2015"],["Avondale","09/05/2015"],["Austin","09/04/2015"],["Auburn Gresham","09/05/2015"],["West Town","09/04/2015"],["Avondale","09/05/2015"]], schema=["Community_Area", "Date"])
df = df.withColumn("Date", F.to_date("Date", format="MM/dd/yyyy"))
w = Window.partitionBy("Date", "Community_Area").orderBy("Date", "Community_Area")
df = df.withColumn("Count", F.row_number().over(w))

[Out]:
 -------------- ---------- ----- 
|Community_Area|      Date|Count|
 -------------- ---------- ----- 
|        Austin|2015-09-04|    1|
|        Austin|2015-09-04|    2|
|     West Town|2015-09-04|    1|
|Auburn Gresham|2015-09-05|    1|
|      Avondale|2015-09-05|    1|
|      Avondale|2015-09-05|    2|
|      New City|2015-09-05|    1|
|      New City|2015-09-05|    2|
 -------------- ---------- ----- 

UPDATE

You have asked for weekly cumulative sum as well. For that partition by year, week of the year & area and then sum over count by partition.

PS - I have changed the input data to test sum for different weeks. Since, your ask and given expected output does not match, I have made some assumptions that you need total for every week by area.

import pyspark.sql.functions as F
from pyspark.sql import Window

df = spark.createDataFrame(data=[["New City","09/03/2015"],["Austin","09/03/2015"],["New City","09/03/2015"],["Austin","09/03/2015"],["New City","09/05/2015"],["Austin","09/05/2015"],["New City","09/07/2015"],["Austin","09/07/2015"],["Austin","09/09/2015"],["New City","09/09/2015"],["Austin","09/11/2015"],["New City","09/11/2015"]], schema=["Community_Area", "Date"])
df = df.withColumn("Date", F.to_date("Date", format="MM/dd/yyyy"))
df = df.withColumn("weekofyear", F.weekofyear("Date"))
df = df.withColumn("year", F.year("Date"))
w = Window.partitionBy("Date", "weekofyear", "Community_Area").orderBy("Date", "weekofyear", "Community_Area")
df = df.withColumn("Count", F.row_number().over(w))
w2 = Window.partitionBy("year", "weekofyear", "Community_Area").orderBy("year", "weekofyear", "Community_Area")
df = df.withColumn("Rolling 7-day sum", F.sum("Count").over(w2))

[Out]:
 -------------- ---------- ---------- ---- ----- ----------------- 
|Community_Area|      Date|weekofyear|year|Count|Rolling 7-day sum|
 -------------- ---------- ---------- ---- ----- ----------------- 
|        Austin|2015-09-03|        36|2015|    1|                4|
|        Austin|2015-09-03|        36|2015|    2|                4|
|        Austin|2015-09-05|        36|2015|    1|                4|
|      New City|2015-09-03|        36|2015|    1|                4|
|      New City|2015-09-03|        36|2015|    2|                4|
|      New City|2015-09-05|        36|2015|    1|                4|
|        Austin|2015-09-07|        37|2015|    1|                3|
|        Austin|2015-09-09|        37|2015|    1|                3|
|        Austin|2015-09-11|        37|2015|    1|                3|
|      New City|2015-09-07|        37|2015|    1|                3|
|      New City|2015-09-09|        37|2015|    1|                3|
|      New City|2015-09-11|        37|2015|    1|                3|
 -------------- ---------- ---------- ---- ----- ----------------- 
  • Related