Two separate queries.
1. I have a 'm' raster files and 'n' vector files. I would like to use map function (as in R) and iterate through a list of 'n' vector files for each 'm' raster files. I got the output by writing separate for loop for each vector file.
2.As given below, I am using for loop for each vector file. If i run it in a single script, I will be using only single processor. Is it possible to do multiprocessing to reduce the time?
Here is the for loop:
filenames_dat[i] is the raster input
df1 = gpd.read_file("input_path")
df2 = gpd.read_file("input_path")
for i in range(len(raster_path)):
array_name, trans_name = mask(filenames_dat[i], shapes=df1.geometry, crop=True, nodata=np.nan)
zs= zonal_stats(df1, array_name[0], affine=trans_name, stats=['mean','sum'], nodata=np.nan, all_touched=True)
df1['amg' str(filenames[i])] = [x[('mean')] for x in zs]
df1['mpg' str(filenames[i])] = [x[('sum')] for x in zs]
print(i)
df1csv = pd.DataFrame(df1)
df1csv.to_csv(cwd '/rasteroutput/df1.csv', index = False)
for i in range(len(raster_path)):
array_name, trans_name = mask(filenames_dat[i], shapes=df2.geometry, crop=True, nodata=np.nan)
zs= zonal_stats(df2, array_name[0], affine=trans_name, stats=['mean','sum'], nodata=np.nan, all_touched=True)
df2['amg' str(filenames[i])] = [x[('mean')] for x in zs]
df2['mpg' str(filenames[i])] = [x[('sum')] for x in zs]
print(i)
df2csv = pd.DataFrame(df2)
df2csv.to_csv(cwd '/rasteroutput/df2.csv', index = False)
Here is the function which I have not used as I am not sure how to use map with multiple arguments. 'i' is the index for raster list. poly2 function works for each integer 'i' (ie; i =1) but not when I store 'i' as list of index. list(map(poly2,lst,df)) shows error. Was looking for something similar to map2df as in R.
def poly2(i,df):## i is for year
df = df
array_name, trans_name = mask(filenames_dat[i], shapes=df.geometry, crop=True, nodata=np.nan)
zs= zonal_stats(df, array_name[0], affine=trans_name, stats=['mean','sum'], nodata=np.nan, all_touched=True)
df['amg' str(filenames[i])] = [x[('mean')] for x in zs]
df['mpg' str(filenames[i])] = [x[('sum')] for x in zs]
print(i)
lst=[]
for i in range(len(raster_path)):
lst.append(i)
poly2(i=1, df=df)
list(map(poly2,lst,df)) ## shows error.
CodePudding user response:
I also find the description of your problem a bit confusing. However, one possible solution for how to use multi-processing in python is shown below:
Starmap takes e.g. an iterable as input for a single iteration and unpacks it. See the link.
from multiprocessing import Pool
def add(x, y, id_num):
print(f"\nRunning Process: {id_num} -> result: {x y}")
return x y
tasks_x = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
tasks_y = [10] * 10
with Pool(processes=2) as pool:
results = pool.starmap(add, zip(tasks_x, tasks_y, range(10)))
Which results in:
Running Process: 2 -> result: 13
Running Process: 0 -> result: 11
Running Process: 3 -> result: 14
Running Process: 1 -> result: 12
Running Process: 4 -> result: 15
Running Process: 6 -> result: 17
Running Process: 7 -> result: 18
Running Process: 5 -> result: 16
Running Process: 8 -> result: 19
Running Process: 9 -> result: 20
Note that 'results' is still sorted by the input order:
results
Out[8]: [11, 12, 13, 14, 15, 16, 17, 18, 19, 20]
I also recommend to look into joblib, it is sometimes very handy and allows you to write on-liners.
CodePudding user response:
I will answer one of your questions first. If you have a function poly
that takes arguments i, which varies, and df, whose value is always the same on every call, and you need to call poly
as if it were a function of just a single argument (as required by the map
function). Then:
from functools import partial
df = some_data_frame
modified_poly = partial(poly, df=df)
# Now calling modified_poly(0) is equivalent to calling
# poly(0, df)
You could also use itertools.starmap
or, if doing multiprocessing, multiprocessing.pool.Pool.starmap
, that can handle the invocation of a function that takes multiple arguments. But in your case where on argument never varies (df
), I would just use functools.partial
. Please read up on this. Moving on ...
You didn't really answer my question in a way that makes things clearer for me. But I will just assume you are trying to parallelize the following code:
def poly2(i,df):## i is for year
df = df
array_name, trans_name = mask(filenames_dat[i], shapes=df.geometry, crop=True, nodata=np.nan)
zs= zonal_stats(df, array_name[0], affine=trans_name, stats=['mean','sum'], nodata=np.nan, all_touched=True)
df['amg' str(filenames[i])] = [x[('mean')] for x in zs]
df['mpg' str(filenames[i])] = [x[('sum')] for x in zs]
print(i)
My continued source of confusion stems from your saying you have m
raster files and n
vector files and using a for loop for each vector file. You further say filenames_dat[i]
is "raster input". So the above loop seems to be iterating each raster file but I do not see where you are then iterating each vector file for a given raster file. Or does the call to zonal_stats
iterate through the vector files? Let's move on again ...
First, a few observations:
df = df
accomplished nothing.('mean')
evaluates to'mean'
. It's not wrong per se, but why not justx['mean']
?- Your call to
map
is wrong sincemap
takes two arguments, a function and an iterable. - You have a loop that creates and initializes variable
lst
, which could have been created withlst = [i for i in range(len(raster_path))]
(better) orlst = list(range(len(raster_path)))
(best).
Moving on ...
The problem is that processes do not share the same address space, unlike threads. So if you use multiprocessing passing df
to each process, they will be working on and modifying their own version of it, which is not very helpful in your case. The problem, however, with Python threads is that they do not execute Python code in parallel since such code must acquire the so-called Global Interpreter Lock (library code written in C or C can be a different story). That is, the Python interpreter when executing Python code is not thread-safe. So multiprocessing it is. But the problem with multiprocessing is that, unlike with threads, creating new processes and moving data from one address space to another is expensive. This added expense can be more than compensated for by having code executed in parallel that otherwise would be executed serially -- but only if this executed code is CPU-intensive to actually overcome the additional expense. So your poly
function cannot be too trivial in terms of CPU requirements or performance will be worse under multiprocessing.
Finally, to overcome the separate address space issue. I would have your poly
function not attempt to modify the passed in df
dataframe, but instead return results that the main process then uses to updated a single df
instance:
from multiprocessing import Pool, cpu_count
from functool import partial
... # Other code omitted for brevity
def poly2(i,df):## i is for year
array_name, trans_name = mask(filenames_dat[i], shapes=df.geometry, crop=True, nodata=np.nan)
zs= zonal_stats(df, array_name[0], affine=trans_name, stats=['mean','sum'], nodata=np.nan, all_touched=True)
return (i, [x[('mean')] for x in zs], [x[('sum')] for x in zs])
# Required by Windows (but okay even if not Windows):
if __name__ == '__main__':
df = some_dataframe
# Create a pool size no larger than both the number of CPU cores you have
# and the number of tasks being submitted:
pool_size = min(cpu_count, len(raster_path))
pool = Pool(pool_size)
results = pool.map(partial(poly, df=df), range(len(raster_path))
for i, mean, the_sum in results:
df['amg' str(filenames[i])] = mean
df['mpg' str(filenames[i])] = the_sum
# shutdown the pool:
pool.close()
pool.join()
If df
is large (and working on the assumption that poly
no longer modifies the dataframe), then you can avoid passing it for each task submitted by initializing the global storage of each pool process with its value:
from multiprocessing import Pool, cpu_count
... # Other code omitted for brevity
def init_pool_process(the_dataframe):
global df
df = the_dataframe # Initialize global varibale df
# Now df is no longer an argument but read from global storage:
def poly2(i):## i is for year
array_name, trans_name = mask(filenames_dat[i], shapes=df.geometry, crop=True, nodata=np.nan)
zs= zonal_stats(df, array_name[0], affine=trans_name, stats=['mean','sum'], nodata=np.nan, all_touched=True)
return (i, [x[('mean')] for x in zs], [x[('sum')] for x in zs])
# Required by Windows (but okay even if not Windows):
if __name__ == '__main__':
df = some_dataframe
# Create a pool size no larger than both the number of CPU cores you have
# and the number of tasks being submitted:
pool_size = min(cpu_count, len(raster_path))
# Initaialize global storage of each pool process with df:
pool = Pool(pool_size, initializer=init_global_process, initargs=(df,))
results = pool.map(poly, range(len(raster_path))
for i, mean, the_sum in results:
df['amg' str(filenames[i])] = mean
df['mpg' str(filenames[i])] = the_sum
# shutdown the pool:
pool.close()
pool.join()
So now df
is created once by the main process and copied once for each pool process instead of once for each task submitted by map
.