I want to create dictionaries in a loop.
Since, in every iteration I am taking only a part of the initial dataframe ( df_train = df[df['CLASS'] == oneClass]
) , I want to make it parallel.
My code is:
import pandas as pd
import numpy as np
from multiprocessing import Pool
df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
def make_dataframes(df, oneClass):
new_df = {}
df_train = df[df['CLASS'] == oneClass]
numeric_only_data_cols = df_train.select_dtypes(include=np.number).columns.difference(['CLASS'])
numeric_only_data = df_train[numeric_only_data_cols]
X = numeric_only_data.values
x = X * 100
orig_columns = numeric_only_data.loc[:,
numeric_only_data.columns!='CLASS'].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]['CLASS'] = df_train['CLASS']
return new_df
new_df = {}
classes = np.unique(df['CLASS'])
with Pool(4) as pool:
for new_dataframe in pool.map(make_dataframes, classes):
new_df['new_dataframe'] = new_dataframe
pool.close()
pool.join()
I omitted the for loop in the function:
new_df = {}
for oneClass in classes:
df_train = df[df['GROUP_DESC'] == oneClass]
...
Now, I am receiving:
make_dataframes() missing 1 required positional argument: 'oneClass'
I am not sure how to place the arguments of the function and if the classes
is a valid argument for map
.
CodePudding user response:
Are you planning on executing your code inside a cluster? If not, then you're probably better off executing your code in the old single process fashioned way. There's this great talk on the subject by Raymond Hettinger that I find pretty interesting, and I recommend checking out: Raymond Hettinger, Keynote on Concurrency, PyBay 2017.
Having said that, one easy fix to your implementation would be to define a single parameter as input to make_dataframes
, that represents a tuple of both df
, and oneClass
:
import pandas as pd
import numpy as np
from multiprocessing import Pool
def make_dataframes(args):
new_df = {}
df = args[0] # <--- Unpacking values
oneClass = args[-1] # <--- Unpacking values
df_train = df[df['CLASS'] == oneClass]
numeric_only_data = df_train.select_dtypes(include=np.number).loc[:, lambda xdf: xdf.columns.difference(['CLASS'])]
X = numeric_only_data.values
x = X * 100
orig_columns = numeric_only_data.loc[:, numeric_only_data.columns != 'CLASS'].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]['CLASS'] = df_train['CLASS']
return new_df
df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
new_df = {}
classes = np.unique(df["CLASS"])
with Pool(4) as pool:
for new_dataframe in pool.map(make_dataframes, zip([df]*len(classes), classes)):
new_df[list(new_dataframe.keys())[0]] = list(new_dataframe.values())[0]
pool.close()
pool.join()
A second approach would be to use the Joblib package instead of multiprocessing
, like so:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
def make_dataframes(df, oneClass):
new_df = {}
df_train = df[df["CLASS"] == oneClass]
numeric_only_data = df_train.select_dtypes(include=np.number).loc[
:, lambda xdf: xdf.columns.difference(["CLASS"])
]
X = numeric_only_data.values
x = X * 100
orig_columns = numeric_only_data.loc[
:, numeric_only_data.columns != "CLASS"
].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]["CLASS"] = df_train["CLASS"]
return new_df
df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
classes = np.unique(df["CLASS"])
new_df = {
key: value
for parallel in Parallel(n_jobs=4)(
delayed(make_dataframes)(df, i) for i in classes
)
for key, value in parallel.items()
}
Finally, the approach I recommend using, if you're not planning on running this code inside a power-hungry cluster, and need to extract all the juice you can get from it:
import pandas as pd
import numpy as np
from joblib import Parallel, delayed
def make_dataframes(df, oneClass):
new_df = {}
df_train = df[df["CLASS"] == oneClass]
numeric_only_data = df_train.select_dtypes(include=np.number).loc[
:, lambda xdf: xdf.columns.difference(["CLASS"])
]
X = numeric_only_data.values
x = X * 100
orig_columns = numeric_only_data.loc[
:, numeric_only_data.columns != "CLASS"
].columns
new_df[oneClass] = pd.DataFrame(x, columns=orig_columns)
new_df[oneClass]["CLASS"] = df_train["CLASS"]
return new_df
df = pd.DataFrame({'a':[0,1,2], 'b':[3, 4, 5], 'c': [6, 7, 8], 'CLASS':['A', 'B', 'C']})
classes = np.unique(df["CLASS"])
new_df = {c: make_dataframes(df, c)[c] for c in classes}
For comparison, I've recorded each approach execution time:
multiprocessing
: CPU times: user 13.6 ms, sys: 41.1 ms, total: 54.7 ms Wall time: 158 msjoblib
: CPU times: user 14.3 ms, sys: 0 ns, total: 14.3 ms Wall time: 16.5 msSerial processing
: CPU times: user 14.1 ms, sys: 797 µs, total: 14.9 ms Wall time: 14.9 ms
Running things in parallel has a lot of overhead communication costs between the different processing nodes. Besides it's an intrinsically more complex task to do, then to run things serially. Consequently, developing and maintaining the code becomes exponentially harder and expensive. If running things in parallel is number 1 priority, I would recommend first ditching Pandas, and using PySpark, or Dask instead.