I have a large customer data set (10 million ) , that I am running my loop calculation. I am trying to add multiprocessing, but it seems to take longer when I use multiprocessing, by splitting data1 into chunks running it in sagemaker studio. I am not sure what I am doing wrong but the calculation takes longer when using multiprocessing, please help.
input data example:
state_list = ['A','B','C','D','E'] #possible states
data1 = pd.DataFrame({"cust_id": ['x111','x112'], #customer data
"state": ['B','E'],
"amount": [1000,500],
"year":[3,2],
"group":[10,10],
"loan_rate":[0.12,0.13]})
data1['state'] = pd.Categorical(data1['state'],
categories=state_list,
ordered=True).codes
lookup1 = pd.DataFrame({'year': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'lim %': [0.1, 0.1, 0.1, 0.1, 0.1,0.1, 0.1, 0.1, 0.1, 0.1]}).set_index(['year'])
matrix_data = np.arange(250).reshape(10,5,5) #3d matrix by state(A-E) and year(1-10)
end = pd.Timestamp(year=2021, month=9, day=1) # creating a list of dates
df = pd.DataFrame({"End": pd.date_range(end, periods=10, freq="M")})
df['End']=df['End'].dt.day
End=df.values
end_dates = End.reshape(-1) # array([30, 31, 30, 31, 31, 28, 31, 30, 31, 30]); just to simplify access to the end date values
calculation:
num_processes = 4
# Split the customer data into chunks
chunks = np.array_split(data1, num_processes)
queue = mp.Queue()
def calc(chunk):
results1={}
for cust_id, state, amount, start, group, loan_rate in chunks.itertuples(name=None, index=False):
res1 = [amount * matrix_data[start-1, state, :]]
for year in range(start 1, len(matrix_data) 1,):
res1.append(lookup1.loc[year].iat[0] * np.array(res1[-1]))
res1.append(res1[-1] * loan_rate * end_dates[year-1]/365) # year - 1 here
res1.append(res1[-1] 100)
res1.append(np.linalg.multi_dot([res1[-1],matrix_data[year-1]]))
results1[cust_id] = res1
queue.put(results1)
processes = [mp.Process(target=calc, args=(chunk,)) for chunk in chunks]
for p in processes:
p.start()
for p in processes:
p.join()
results1 = {}
while not queue.empty():
results1.update(queue.get())
CodePudding user response:
I think it would be easier to use a multiprocessing pool with the map
method, which will submit tasks in chunks anyway but your worker function calc
just needs to deal with individuals tuples since the chunking is done in a transparent function. The pool will compute what it thinks is an optimal number of rows to be chunked together based on the total number of rows and the number of processes in the pool, but you can override this. So a solution would look something like the following. Since you have not tagged your question with the OS you are running under, the code below should run under Windows, Linux or MacOS in the most efficient way for that platform. But as I mentioned in a comment, multiprocessing may actually slow down getting your results if calc
is not sufficiently CPU-intensive.
from multiprocessing import Pool
import pandas as pd
import numpy as np
def init_pool_processes(*args):
global lookup1, matrix_data, end_dates
lookup1, matrix_data, end_dates = args # unpack
def calc(t):
cust_id, state, amount, start, group, loan_rate = t # unpack
results1 = {}
res1 = [amount * matrix_data[start-1, state, :]]
for year in range(start 1, len(matrix_data) 1,):
res1.append(lookup1.loc[year].iat[0] * np.array(res1[-1]))
res1.append(res1[-1] * loan_rate * end_dates[year-1]/365) # year - 1 here
res1.append(res1[-1] 100)
return (cust_id, res1) # return tuple
def main():
state_list = ['A','B','C','D','E'] #possible states
data1 = pd.DataFrame({"cust_id": ['x111','x112'], #customer data
"state": ['B','E'],
"amount": [1000,500],
"year":[3,2],
"group":[10,10],
"loan_rate":[0.12,0.13]})
data1['state'] = pd.Categorical(data1['state'],
categories=state_list,
ordered=True).codes
lookup1 = pd.DataFrame({'year': [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
'lim %': [0.1, 0.1, 0.1, 0.1, 0.1,0.1, 0.1, 0.1, 0.1, 0.1]}).set_index(['year'])
matrix_data = np.arange(250).reshape(10,5,5) #3d matrix by state(A-E) and year(1-10)
end = pd.Timestamp(year=2021, month=9, day=1) # creating a list of dates
df = pd.DataFrame({"End": pd.date_range(end, periods=10, freq="M")})
df['End']=df['End'].dt.day
End=df.values
end_dates = End.reshape(-1) # array([30, 31, 30, 31, 31, 28, 31, 30, 31, 30]); just to simplify access to the end date values
with Pool(initializer=init_pool_processes, initargs=(lookup1, matrix_data, end_dates)) as pool:
results = {cust_id: arr for cust_id, arr in pool.map(calc, data1.itertuples(name=None, index=False))}
for cust_id, arr in results.items():
print(cust_id, arr)
if __name__ == '__main__':
main()
Prints:
x111 [array([55000, 56000, 57000, 58000, 59000]), array([5500., 5600., 5700., 5800., 5900.]), array([56.05479452, 57.0739726 , 58.09315068, 59.11232877, 60.13150685]), array([156.05479452, 157.0739726 , 158.09315068, 159.11232877,
160.13150685]), array([15.60547945, 15.70739726, 15.80931507, 15.91123288, 16.01315068]), array([0.15904763, 0.16008635, 0.16112507, 0.1621638 , 0.16320252]), array([100.15904763, 100.16008635, 100.16112507, 100.1621638 ,
100.16320252]), array([10.01590476, 10.01600864, 10.01611251, 10.01621638, 10.01632025]), array([0.09220121, 0.09220216, 0.09220312, 0.09220407, 0.09220503]), array([100.09220121, 100.09220216, 100.09220312, 100.09220407,
100.09220503]), array([10.00922012, 10.00922022, 10.00922031, 10.00922041, 10.0092205 ]), array([0.10201178, 0.10201178, 0.10201178, 0.10201178, 0.10201178]), array([100.10201178, 100.10201178, 100.10201178, 100.10201178,
100.10201178]), array([10.01020118, 10.01020118, 10.01020118, 10.01020118, 10.01020118]), array([0.09873075, 0.09873075, 0.09873075, 0.09873075, 0.09873075]), array([100.09873075, 100.09873075, 100.09873075, 100.09873075,
100.09873075]), array([10.00987308, 10.00987308, 10.00987308, 10.00987308, 10.00987308]), array([0.10201843, 0.10201843, 0.10201843, 0.10201843, 0.10201843]), array([100.10201843, 100.10201843, 100.10201843, 100.10201843,
100.10201843]), array([10.01020184, 10.01020184, 10.01020184, 10.01020184, 10.01020184]), array([0.09873076, 0.09873076, 0.09873076, 0.09873076, 0.09873076]), array([100.09873076, 100.09873076, 100.09873076, 100.09873076,
100.09873076])]
x112 [array([22500, 23000, 23500, 24000, 24500]), array([2250., 2300., 2350., 2400., 2450.]), array([24.04109589, 24.57534247, 25.10958904, 25.64383562, 26.17808219]), array([124.04109589, 124.57534247, 125.10958904, 125.64383562,
126.17808219]), array([12.40410959, 12.45753425, 12.5109589 , 12.56438356, 12.61780822]), array([0.13695496, 0.13754483, 0.1381347 , 0.13872456, 0.13931443]), array([100.13695496, 100.13754483, 100.1381347 , 100.13872456,
100.13931443]), array([10.0136955 , 10.01375448, 10.01381347, 10.01387246, 10.01393144]), array([0.11056217, 0.11056282, 0.11056347, 0.11056413, 0.11056478]), array([100.11056217, 100.11056282, 100.11056347, 100.11056413,
100.11056478]), array([10.01105622, 10.01105628, 10.01105635, 10.01105641, 10.01105648]), array([0.09983629, 0.09983629, 0.09983629, 0.09983629, 0.09983629]), array([100.09983629, 100.09983629, 100.09983629, 100.09983629,
100.09983629]), array([10.00998363, 10.00998363, 10.00998363, 10.00998363, 10.00998363]), array([0.11052119, 0.11052119, 0.11052119, 0.11052119, 0.11052119]), array([100.11052119, 100.11052119, 100.11052119, 100.11052119,
100.11052119]), array([10.01105212, 10.01105212, 10.01105212, 10.01105212, 10.01105212]), array([0.10696741, 0.10696741, 0.10696741, 0.10696741, 0.10696741]), array([100.10696741, 100.10696741, 100.10696741, 100.10696741,
100.10696741]), array([10.01069674, 10.01069674, 10.01069674, 10.01069674, 10.01069674]), array([0.11052906, 0.11052906, 0.11052906, 0.11052906, 0.11052906]), array([100.11052906, 100.11052906, 100.11052906, 100.11052906,
100.11052906]), array([10.01105291, 10.01105291, 10.01105291, 10.01105291, 10.01105291]), array([0.10696741, 0.10696741, 0.10696741, 0.10696741, 0.10696741]), array([100.10696741, 100.10696741, 100.10696741, 100.10696741,
100.10696741])]
If you wish to save memory, you could use method imap_unordered
:
def main():
... # code omitted
def compute_chunksize(iterable_size, pool_size):
chunksize, remainder = divmod(iterable_size, 4 * pool_size)
if remainder:
chunksize = 1
return chunksize
from multiprocessing import cpu_count
pool_size = cpu_count()
iterable_size = 100_000 # Your best estimate
chunksize = compute_chunksize(iterable_size, pool_size)
with Pool(pool_size, initializer=init_pool_processes, initargs=(lookup1, matrix_data, end_dates)) as pool:
it = pool.imap_unordered(calc, data1.itertuples(name=None, index=False), chunksize=chunksize)
"""
# Create dictionary in memory:
results = {cust_id: arr for cust_id, arr in it}
"""
# Or to save memory, iterate the results:
for cust_id, arr in it:
print(cust_id, arr)
if __name__ == '__main__':
main()