I have a problem. I would like to calculate the turnover for a customer in the last 6 months. The methods work on my dummy record, unfortunately the whole thing does not work on my real record as it is too slow. How can I rewrite this so that it performs faster?
Dataframe
customerId fromDate sales
0 1 2022-06-01 100
1 1 2022-05-25 20
2 1 2022-05-25 50
3 1 2022-05-20 30
4 1 2021-09-05 40
5 2 2022-06-02 80
6 3 2021-03-01 50
7 3 2021-02-01 20
Code
from datetime import datetime
from dateutil.relativedelta import relativedelta
import pandas as pd
def find_last_date(date_: datetime) -> datetime:
six_months = date_ relativedelta(months=-6)
return six_months
def sum_func(row: pd.DataFrame, df: pd.DataFrame) -> int :
return df[
(df["customerId"] == row["customerId"])
& (row["fromDate"] relativedelta(months=-6)<= df["fromDate"])
& (df["fromDate"] <= row["fromDate"])
]["sales"].sum()
d = {
"customerId": [1, 1, 1, 1, 1, 2, 3, 3],
"fromDate": [
"2022-06-01",
"2022-05-25",
"2022-05-25",
"2022-05-20",
"2021-09-05",
"2022-06-02",
"2021-03-01",
"2021-02-01",
],
"sales": [100, 20, 50, 30, 40, 80, 50, 20],
}
df = pd.DataFrame(data=d)
df["fromDate"] = pd.to_datetime(df["fromDate"], errors="coerce")
df["last_month"] = df["fromDate"].apply(find_last_date)
df["total_sales"]=df[["customerId", "fromDate"]].apply(lambda x: sum_func(x, df), axis=1)
print(df)
What I want
customerId fromDate sales last_month total_sales
0 1 2022-06-01 100 2022-03-01 200 # 100 20 50 30
1 1 2022-05-25 20 2022-02-25 100 # 20 50 30
2 1 2022-05-25 50 2022-02-25 100 # 50 20 30
3 1 2022-05-20 30 2022-02-20 30 # 30
4 1 2021-09-05 40 2021-06-05 40 # 40
5 2 2022-06-02 80 2022-03-02 80 # 80
6 3 2021-03-01 50 2020-12-01 70 # 50 20
7 3 2021-02-01 20 2020-11-01 20 # 20
print(df['customerId'].value_counts().describe())
count 53979.000
mean 87.404
std 1588.450
min 1.000
25% 2.000
50% 6.000
75% 22.000
max 205284.000
print(df['fromDate'].agg((min, max)))
min 2021-02-22
max 2022-03-26
CodePudding user response:
Use numpy broadcasting per groups with numpy.where
for set for True
values of Sales
and if not match 0
, so possible sum sales to new column:
df["fromDate"] = pd.to_datetime(df["fromDate"], errors="coerce")
df["last_month"] = df["fromDate"] - pd.offsets.DateOffset(months=6)
def f(x):
d1 = x["fromDate"].to_numpy()
d2 = x["last_month"].to_numpy()
mask = (d2[:, None]<=d1) & (d1<=d1[:, None])
x['total_sales'] = np.dot(mask, x['sales'].to_numpy())
return x
df = df.groupby('customerId').apply(f)
print(df)
customerId fromDate sales last_month total_sales
0 1 2022-06-01 100 2021-12-01 200
1 1 2022-05-25 20 2021-11-25 100
2 1 2022-05-25 50 2021-11-25 100
3 1 2022-05-20 30 2021-11-20 30
4 1 2021-09-05 40 2021-03-05 40
5 2 2022-06-02 80 2021-12-02 80
6 3 2021-03-01 50 2020-09-01 70
7 3 2021-02-01 20 2020-08-01 20
EDIT:
df["fromDate"] = pd.to_datetime(df["fromDate"], errors="coerce")
df["last_month"] = df["fromDate"] - pd.offsets.DateOffset(months=6)
#https://stackoverflow.com/a/27670190/2901002
def chunking_dot(big_matrix, small_matrix, chunk_size=10000):
# Make a copy if the array is not already contiguous
small_matrix = np.ascontiguousarray(small_matrix)
R = np.empty((big_matrix.shape[0], small_matrix.shape[1]))
for i in range(0, R.shape[0], chunk_size):
end = i chunk_size
R[i:end] = np.dot(big_matrix[i:end], small_matrix)
return R
def f(x):
d1 = x["fromDate"].to_numpy()
d2 = x["last_month"].to_numpy()
mask = (d2[:, None]<=d1) & (d1<=d1[:, None])
# print (mask)
x['total_sales'] = chunking_dot(mask, x[['sales']].to_numpy())
return x
df = df.groupby('customerId').apply(f)
print(df)
customerId fromDate sales last_month total_sales
0 1 2022-06-01 100 2021-12-01 200.0
1 1 2022-05-25 20 2021-11-25 100.0
2 1 2022-05-25 50 2021-11-25 100.0
3 1 2022-05-20 30 2021-11-20 30.0
4 1 2021-09-05 40 2021-03-05 40.0
5 2 2022-06-02 80 2021-12-02 80.0
6 3 2021-03-01 50 2020-09-01 70.0
7 3 2021-02-01 20 2020-08-01 20.0
CodePudding user response:
Using multiprocessing and consider 6 months as 180 days to reduce the memory size and the time computing.
Copy the following code to a python file and run it from the console (not from a Jupyter Notebook)
import pandas as pd
import numpy as np
import multiprocessing as mp
import time
def sum_sales(customer, df):
# 1st pass: sum sales of same days, reduce the row numbers
df1 = df.groupby('fromDate')['sales'].sum()
# Generate all missing dates
df1 = df1.reindex(pd.date_range(df1.index.min(), df1.index.max(), freq='D'), fill_value=0)
# 2nd pass: use a sliding window of 180 days to sum
df1 = df1.rolling(90, min_periods=0).sum().astype(int)
# Restore original index for the group
df1 = df1.reindex(df['fromDate']).reset_index(drop=True).to_frame().set_index(df.index)
return df1
if __name__ == '__main__': # Do not remove this line! Mandatory
# Setup a minimal reproducible example
N = 3_000_000
D = pd.to_datetime('2021-1-1')
rng = np.random.default_rng(2022)
dti = D pd.to_timedelta(rng.integers(0, 365*2, N), unit='D')
cust = rng.integers(0, 75000, N)
sales = rng.integers(1, 200, N)
df = pd.DataFrame({'customerId': cust, 'fromDate': dti, 'sales': sales})
# Ensure your dataframe is sorted by fromDate for rolling window
df.sort_values(['customerId', 'fromDate'], ignore_index=True)
start = time.time()
with mp.Pool(mp.cpu_count() - 1) as p:
results = p.starmap(sum_sales, df.groupby('customerId'))
df['total_sales'] = pd.concat(results)
end = time.time()
print(f"Elapsed time: {end - start:.2f} seconds")
For 3mio records and 75k different customers on 2 years (730 days)
[...]$ python mp.py
Elapsed time: 24.36 seconds
However the number of sales per customer is well balanced than your:
>>> df['customerId'].value_counts().describe(percentiles=np.linspace(0, 1, 11)
count 75000.000000
mean 40.000000
std 6.349157
min 15.000000
0% 15.000000
10% 32.000000
20% 35.000000
30% 37.000000
40% 38.000000
50% 40.000000
60% 41.000000
70% 43.000000
80% 45.000000
90% 48.000000 # <- check the 90th percentile of your data
100% 73.000000
max 73.000000 # <- max transactions for a single customer
Name: customerId, dtype: float64
Because the sales are properly distributed per customer, my sample takes advantage of multiprocessing. In your case, I don't think it will be the case (check the 90th percentile).
The check with your dataframe:
>>> df
customerId fromDate sales total_sales
0 1 2022-06-01 100 200
1 1 2022-05-25 20 100
2 1 2022-05-25 50 100
3 1 2022-05-20 30 30
4 1 2021-09-05 40 40
5 2 2022-06-02 80 80
6 3 2021-03-01 50 70
7 3 2021-02-01 20 20
If you decide to choose to keep a variable moving window of 6 months instead of a fixed moving window of 180 days, the algorithm will me the same. The important point in the code is to reduce the number of rows per customer. In your sample, you can group the sales for a same (customer, date). The customer 1 have 2 rows for 2022-05-25 so you can sum them immediately.
IIUC, in your real data, you have a customer with 205284 sales between 2021-02-22 and 2022-03-26 (397 days), so this user has an average of 517 transactions per day (?). If you sum sales of same days, you reduce the number of records from 205284 to 397...