I have a process that iterates over a pandas dataframe using by row, does some work, and spits out results.
for index, row in df.iterrows():
os.chdir(outputdir)
rowdf = row.to_frame().T
points =[[rowdf.iloc[0].id,rowdf.iloc[0].start_location_long, rowdf.iloc[0].start_location_lat],[rowdf.iloc[0].id,rowdf.iloc[0].end_location_long, rowdf.iloc[0].end_location_lat]]
pgdf = gpd.GeoDataFrame(pd.DataFrame(points, columns =['id', 'longitude', 'latitude']), geometry=gpd.points_from_xy(pd.DataFrame(points, columns =['id', 'longitude', 'latitude']).longitude, pd.DataFrame(points, columns =['id', 'longitude', 'latitude']).latitude), crs=4326).to_crs('esri:102001')
area = buffer(pgdf)
area['id']=rowdf.iloc[0].id
area.to_crs(4326).to_file(str(year) '.gpkg', driver='GPKG', layer=str(rowdf.iloc[0].id) '_area')
make_line(pgdf, 'id').to_crs(4326).to_file(str(year) '.gpkg', driver='GPKG', layer=str(rowdf.iloc[0].id) '_plot')
try:
pass
map_event(rowdf)
except Exception as e:
error_list.append([str(row.iloc[0].id),str(e)])
This runs outputting a polygon, a linesegment, and a linestring if map_event(rowdf) is successful. When I rewrite it to
def task(row):
rowdf = row.to_frame().T
points =[[rowdf.iloc[0].id,rowdf.iloc[0].start_location_long, rowdf.iloc[0].start_location_lat],[rowdf.iloc[0].id,rowdf.iloc[0].end_location_long, rowdf.iloc[0].end_location_lat]]
pgdf = gpd.GeoDataFrame(pd.DataFrame(points, columns =['id', 'longitude', 'latitude']), geometry=gpd.points_from_xy(pd.DataFrame(points, columns =['id', 'longitude', 'latitude']).longitude, pd.DataFrame(points, columns =['id', 'longitude', 'latitude']).latitude), crs=4326).to_crs('esri:102001')
area = buffer(pgdf)
area['id']=rowdf.iloc[0].id
area.to_crs(4326).to_file(str(year) '.gpkg', driver='GPKG', layer=str(rowdf.iloc[0].id) '_area')
make_line(pgdf, 'id').to_crs(4326).to_file(str(year) '.gpkg', driver='GPKG', layer=str(rowdf.iloc[0].id) '_plot')
try:
pass
map_event(rowdf)
except Exception as e:
error_list.append([str(row.iloc[0].id),str(e)])
and pass it to a multiprocessing pool with
try:
os.chdir(outputdir)
with Pool(os.cpu_count()-4) as pool:
for results in pool.map(task, df.iterrows()):
pass
except Exception as e:
print(e)
my row can no longer be returned transposed and set to a df, I get the error 'tuple' object has no attribute 'to_frame'
What is causing the behavior change for the line rowdf = row.to_frame().T between single and parallel processing?
CodePudding user response:
df.iterrows()
will pass tuple of index
and row
to the task
. From the docs:
DataFrame.iterrows() Iterate over DataFrame rows as
(index, Series)
pairs.
so change task
accordingly:
def task(data):
(index, row) = data
# ... use row