As I work with multiprocessing
and I have a limit of two API calls at a time, so I need to iterate with a maximum of two lines at a time.
The problem is that sometimes the DataFrame
doesn't have the total of even lines, so to work with total odd lines I created this model:
for_iterate = # DataFrame
if (len(for_iterate.index) % 2) == 0:
df_a = for_iterate.iloc[::2]
df_b = for_iterate.iloc[1::2]
df_c = 'off'
else:
for_iterate_odd = for_iterate.iloc[:-1]
df_a = for_iterate_odd.iloc[::2]
df_b = for_iterate_odd.iloc[1::2]
df_c = for_iterate.iloc[-1:]
final_list = []
with multiprocessing.Pool() as pool:
for (_, example_dataframe_a), (_, example_dataframe_b) in zip(df_a.iterrows(), df_b.iterrows()):
res_1 = pool.apply_async(add_hour, args=(example_dataframe_a,))
res_2 = pool.apply_async(add_hour, args=(example_dataframe_b,))
final_list.extend([res_1.get(), res_2.get()])
if df_c != 'off':
final_list.append(pool.apply_async(add_hour, args=(df_c,)).get())
As you can see, when the total number of lines is not even, I use a separate object that collects only the last line of the DataFrame
and make a separate multiprocessing
to it.
Is there a smarter method of getting around this problem?
CodePudding user response:
If you have control over the code in add_hour
, you could just use itertools.zip_longest and short-circuit the function to return nothing if None
gets passed in. If not, you could just make a wrapper that checks for None
and calls add_hour
otherwise.
CodePudding user response:
Have you tried using a Pool
? I imagine this could be restructured with a pool to run with the following code:
from multiprocessing import Pool
result_storage = pd.Series()
async def add_hour(ix, row):
global result_storage
...
result_storage[ix] = result
for_iterate = ...
with Pool(processes=2) as p:
p.map(add_hour, for_iterate.iterrows())
EDIT: added global variable that is edited within the add_hour function