I have a script that loops over a pandas dataframe and outputs GIS data to a geopackage based on some searches and geometry manipulation. It works when I use a for loop but with over 4k records it takes a while. Since I have it built as it's own function that returns what I need based on a row iteration I tried to run it with multiprocessing with:
import pandas as pd, bwe_mapping
from multiprocessing import Pool
#Sample dataframe
bwes = [['id', 7216],['item_id', 3277841], ['Date', '2019-01-04T00:00:00.000Z'], ['start_lat', -56.92], ['start_lon', 45.87], ['End_lat', -59.87], ['End_lon', 44.67]]
bwedf = pd.read_csv(bwes)
geopackage = "datalocation\geopackage.gpkg"
tracklayer = "tracks"
if __name__=='__main__':
def task(item):
bwe_mapping.map_bwe(item, geopackage, tracklayer)
pool = Pool()
for index, row in bwedf.iterrows():
task(row)
with Pool() as pool:
for results in pool.imap_unordered(task, bwedf.iterrows()):
print(results)
When I run this my Task manager populates with 16 new python tasks but no sign that anything is being done. Would it be better to use numpy.array.split() to break up my pandas df into 4 or 8 smaller ones and run the for index, row in bwedf.iterrows(): for each dataframe on it's own processor? No one process needs to be done in any order; as long as I can store the outputs, which are geopanda dataframes, into a list to concatenate into geopackage layers at the end. Should I have put the for loop in the function and just passed it the whole dataframe and gis data to search?
CodePudding user response:
if you are running on windows/macOS then it's going to use spawn
to create the workers, which means that any child MUST find the function it is going to execute when it imports your main script.
your code has the function definition inside your if __name__=='__main__':
so the children don't have access to it.
simply moving the function def
to before if __name__=='__main__':
will make it work.
what is happening is that each child is crashing when it tries to run a function because it never saw its definition.
minimal code to reproduce the problem:
from multiprocessing import Pool
if __name__ == '__main__':
def task(item):
print(item)
return item
pool = Pool()
with Pool() as pool:
for results in pool.imap_unordered(task, range(10)):
print(results)
and the solution is to move the function definition to before the if __name__=='__main__':
line.