I have a dataframe df1 with N rows. Each row, through complicated calculations, will result in up to K rows. So the source dataframe df1 with N rows will result in an output dataframe dfY with NxK rows. The real df1 can have N=1,000 rows, with calculations that can take 1s per row and can generate several thousand output rows K for each processed row N. Multiprocessing would be useful.
For simplicity, let's assume that the algorithm that calculates the K rows is just a silly way to prepare a cross product, but without using merge. My problem is that the following example works without multiprocessing, within 12ms: function unip().
But with multiprocessing, the code gets stuck in some endless loop: function multip(). What do I do wrong when I try to apply row-wise multiprocessing to df1 in multip()?
import pandas as pd
import numpy as np
import multiprocessing as mp
df1 = pd.DataFrame(np.array(np.meshgrid(['A', 'B', 'C'], ['X', 'Y', 'Z'])).T.reshape(-1,2), columns=['Product', 'Store'])
df1.iloc[[0,-1]]
df2 = pd.DataFrame(np.arange(1,13), columns=['Month'])
df2.iloc[[0,-1]]
def row_to_df(row):
dfX = df2.copy()
dfX.loc[:, "Product"] = row[0] #row["Product"]
dfX.loc[:, "Store"] = row[1] #row["Store"]
return dfX
def unip():
for i, row in enumerate(df1.itertuples(index=False)):
dfX = row_to_df(row)
if i == 0:
dfY = dfX.copy()
else:
dfY = pd.concat([dfY, dfX], axis=0)
return dfY
def iterate_rows(df):
for i, row in enumerate(df.itertuples(index=False)):
dfX = row_to_df(row)
if i == 0:
dfY = dfX.copy()
else:
dfY = pd.concat([dfY, dfX], axis=0)
dfY = dfY.reset_index(drop=True)
display(dfY)
return dfY
def multip():
Nrows = df1.shape[0]
Ncores = mp.cpu_count()
Nparts = min([Nrows, Ncores])
listParts = np.array_split(df1, Nparts)
pool = mp.Pool(Ncores)
dfY = pd.concat(pool.map(row_to_df, [iterate_rows(df) for df in listParts]))
pool.close()
pool.join()
return dfY
%%time
dfY = unip()
dfY
%%time
dfY = multip()
dfY
The display(dfY) in iterate_rows() shows that multip creates the dataframe chunks for the Product/Store rows from (A,X) to (C,Z) as intended. But multip never completes their concatenation as unip() does.
CodePudding user response:
Your multip()
implementation should just be
def multip():
with mp.Pool() as pool:
dfY = pd.concat(
pool.imap(
row_to_df,
df1.itertuples(index=False, name=None),
chunksize=20,
)
)
return dfY
for it to work equivalently to unip()
.
That said, for this test data and algorithm it's about 90x slower than unip()
– you'll want to check whether that holds true for your actual algorithm and data.
You may also wish to check out dask.dataframe
.