I have a pyspark dataframe with following columns
source_cd Day Date hour five_min_block five_min_block_volume
Here, the dates are varying from 31st January 2020 to 31st March 2021. There are 'Day' fields accordingly. Also, source_cd has 5 categories, the hours for every unique date vary from 0 to 23 and corresponding five_min_block varies from 1 to 12. And then I have my value column named as five_min_block_volume.
Now there can be any value in this five_min_block_volume field, starting from 0 to any positive definite number. What I want to do is to count the percentage of zeroes for this column, when aggregated by certain groupby variables ('Date' will never be a part of this groupby variable).
So assume that I want to group it by 'Source_cd', 'Day', 'hour' and 'five_min_block' (and maybe perform mean aggregation for the five_min_block_volume column as the output column). Essentially, my new dataframe will now contain source_cd,Day,hour,five_min_block fields, and no date field now.
Lets say, for a particular combination of source_cd,Day,hour,five_min_block, there were 50 entries in my original dataframe. Out of those 50 entries, 20 had five_min_block_volume as 0 value. So I want to display 40% as my 'percentage of zeroes' column as the newly created column, for this combination, in this grouped dataframe. And likewise for all other rows. I want to acheive this using pyspark. How do I go about doing this
CodePudding user response:
May I suggest for quicker response and more clarity, it is useful if you post some code which someone trying to answer your question could easily copy and paste to produce the example you describe? In any case, I have tried to reproduce your example from the description as best I can below.
Note that the solution is only a couple of lines of code at the end. Hopefully it can help.
Reproducing the example
import pandas as pd
import numpy as np
import pyspark.sql.functions as func
# create the dummy data in pandas then convert to pyspark df
pdf = pd.DataFrame(columns=['source_cd', 'Day', 'Date', 'hour', 'five_min_block', 'five_min_block_volume'])
# create the date range by 5 minute blocks
pdf['Date'] = pd.date_range(start='2020-01-31', end='2020-03-31', freq='5min')
n = pdf.shape[0]
# extract hour and day
pdf['Day'] = pdf['Date'].dt.day
pdf['hour'] = pdf['Date'].dt.hour
pdf['date-temp'] = pdf['Date'].dt.date
# generate the 5 min block labels
pdf['five_min_block'] = 1
pdf['five_min_block'] = pdf.groupby(['date-temp', 'hour'])['five_min_block'].cumsum()
pdf.drop('date-temp', axis=1, inplace=True)
# random source column
pdf['source_cd'] = np.random.randint(low=0, high=5, size=n)
# random volumes, and add some extra zeros
pdf['five_min_block_volume'] = np.random.randint(low=0, high=20000, size=n)
pdf['five_min_block_volume'].iloc[np.random.choice(range(n),size=int(0.2*n))] = 0
Convert to spark dataframe, and to the grouping as described in question
sdf = spark.createDataFrame(pdf)
grouping_columns = ['Source_cd', 'Day', 'hour', 'five_min_block']
sdf.groupBy(grouping_columns).agg(
func.mean(func.col('five_min_block_volume')).alias('avg_of_block_volume'),
func.mean((func.col('five_min_block_volume') == 0).cast('float')).alias('percent_blocks_with_0_volume')
).show()
Output:
CodePudding user response:
You could use something like this:
@funcs.pandas_udf('float', funcs.PandasUDFType.GROUPED_AGG)
def percentage_of_zeroes_agg(percentage_of_zeroes_col: funcs.col) -> float:
return percentage_of_zeroes_col.sum() / percentage_of_zeroes_col.count()
# == Example =============================================================================
# Columns to group dataframe by
groupby_columns = ['Source_cd', 'Day', 'hour']
# Aggregation expression, that computes the rate of zeroes for each group.
aggregation = percentage_of_zeroes_agg(df.percentage_of_zeroes).alias('percentage_of_zeroes')
# Perform the groupby operation
grouped_df = df.groupBy(*groupby_columns).agg(aggregation)
Full code
Here's the entire code, including some helper functions I've created to build a sample dataframe, based on the columns descriptions you gave.
# == Necessary Imports ===================================================================
from __future__ import annotations
import string
import pandas as pd
import numpy as np
import pyspark
from pyspark.sql import functions as funcs
from dateutil.relativedelta import relativedelta
# == Define spark session ================================================================
spark = pyspark.sql.SparkSession.builder.getOrCreate()
# == Helper functions to generate sample dataframe =======================================
# You can ignore these functions, as their purpose is only to create a sample dataframe to
# show how to solve your problem
def get_random_source_cd(n: int, num_cats: int = 5) -> list[str]:
source_cd_cats = string.ascii_uppercase[:num_cats]
return list(
map(source_cd_cats.__getitem__, np.random.randint(0, num_cats, n))
)
def get_random_hours(n: int) -> list[int]:
return np.random.randint(0, 23, n).tolist()
def get_random_dates(
n: int,
start_date: str | pd.Timestamp,
end_date: str | pd.Timestamp | None = None,
days: int | None = None,
) -> list[pd.Timestamp]:
start_date = pd.to_datetime(start_date)
if end_date is None:
if days is None:
days = n * 2
end_date = start_date relativedelta(days=int(days))
else:
end_date = pd.to_datetime(end_date)
possible_dates = pd.date_range(start_date, end_date, freq='d').to_series()
return list(
map(possible_dates.__getitem__, np.random.randint(0, len(possible_dates), n))
)
def get_random_five_min_blocks(n: int) -> list[int]:
return np.random.randint(0, 13, n).tolist()
def generate_random_frame(n: int, **kwargs) -> pd.DataFrame:
dates = get_random_dates(
n, '2022-06-01', end_date=kwargs.get('end_date', None), days=kwargs.get('days', None)
)
days = list(map(lambda date: date.day, dates))
return spark.createDataFrame(
pd.DataFrame(
{
'source_cd': get_random_source_cd(n),
'Day': days,
'Date': dates,
'hour': get_random_hours(n),
'five_min_block': get_random_five_min_blocks(n),
'five_min_block_volume': np.random.random(n),
}
)
).withColumn(
'percentage_of_zeroes',
funcs.when(funcs.col('five_min_block') == 0, 1).otherwise(0)
)
# == User defined function used during aggregation =======================================
@funcs.pandas_udf('float', funcs.PandasUDFType.GROUPED_AGG)
def percentage_of_zeroes_agg(percentage_of_zeroes_col: funcs.col) -> float:
"""Pandas user defined function to compute the percentage of zeroes during aggregation.
Parameters
----------
percentage_of_zeroes_col : funcs.col
The `percentage_of_zeroes_col` column, as `pyspark.sql.column.Column`.
You can specify this parameter like so:
.. code-block:: python
groupby_columns = ['Source_cd', 'Day']
aggregation = percentage_of_zeroes_agg(df.percentage_of_zeroes).alias('percentage_of_zeroes')
grouped_df = df.groupBy(*groupby_columns).agg(aggregation)
In the above example, the aggregation variable shows how you can
use this function.
Returns
-------
float
The rate of values with column `percentage_of_zeroes` equal to 1.
Notes
-----
The `percentage_of_zeroes` column contains the value 1, when the column
`five_min_block` equals zero, and 0 otherwise. Therefore, when you sum all values,
you get the total count of rows from a given group that equal 0. The `count`
returns the number of observations (rows) from each group.
Dividing the sum by count, you get the ratio of zeroes on a given group.
"""
return percentage_of_zeroes_col.sum() / percentage_of_zeroes_col.count()
# == Example =============================================================================
# Generate a randomized Spark Dataframe, based on your columns specifications
df = generate_random_frame(50_000, end_date='2023-12-31')
# Columns to group dataframe by
groupby_columns = ['Source_cd', 'Day', 'hour']
# Aggregation expression, that computes the rate of zeroes for each group.
# NOTE: edit the `.alias` parameter, to change the name of the column that stores
# the aggregation results.
aggregation = percentage_of_zeroes_agg(df.percentage_of_zeroes).alias('percentage_of_zeroes')
# Perform the groupby operation
grouped_df = (
df
.groupBy(*groupby_columns)
.agg(aggregation)
# OPTIONAL: uncomment the next line, to sort the grouped dataframe
# by a set of columns (statement has a heavy impact on performance)
# .orderBy('count_of_zeroes', ascending=False)
)
# OPTIONAL: create column `pretty_percentage_of_zeroes` to store results from aggregation
# in percentage format.
grouped_df = grouped_df.withColumn(
'pretty_percentage_of_zeroes',
funcs.concat(
(funcs.format_number(grouped_df.percentage_of_zeroes * 100, 2)).cast('string'),
funcs.lit('%')
)
)
grouped_df.show()
# --------- --- ---- -------------------- --------------- ---------------------------
# |Source_cd|Day|hour|percentage_of_zeroes|count_of_zeroes|pretty_percentage_of_zeroes|
# --------- --- ---- -------------------- --------------- ---------------------------
# | A| 1| 0| 0.07692308| 1| 7.69%|
# | A| 1| 1| 0.11764706| 2| 11.76%|
# | A| 1| 2| 0.083333336| 1| 8.33%|
# | A| 1| 3| 0.0| 0| 0.00%|
# | A| 1| 4| 0.13333334| 2| 13.33%|
# | A| 1| 5| 0.0| 0| 0.00%|
# | A| 1| 6| 0.2| 2| 20.00%|
# | A| 1| 7| 0.0| 0| 0.00%|
# | A| 1| 8| 0.1764706| 3| 17.65%|
# | A| 1| 9| 0.10526316| 2| 10.53%|
# | A| 1| 10| 0.0| 0| 0.00%|
# | A| 1| 11| 0.0| 0| 0.00%|
# | A| 1| 12| 0.125| 2| 12.50%|
# | A| 1| 13| 0.05882353| 1| 5.88%|
# | A| 1| 14| 0.055555556| 1| 5.56%|
# | A| 1| 15| 0.0625| 1| 6.25%|
# | A| 1| 16| 0.083333336| 1| 8.33%|
# | A| 1| 17| 0.071428575| 1| 7.14%|
# | A| 1| 18| 0.11111111| 1| 11.11%|
# | A| 1| 19| 0.06666667| 1| 6.67%|
# --------- --- ---- -------------------- --------------- ---------------------------