I have a df,you can have it by copy paste:
import pandas as pd
from io import StringIO
df = """
ValOption RB test
0 SLA 4 3
1 AC 5 4
2 SLA 5 5
3 AC 2 4
4 SLA 5 5
5 AC 3 4
6 SLA 4 3
"""
df = pd.read_csv(StringIO(df.strip()), sep='\s ')
Output:
ValOption RB test
0 SLA 4 3
1 AC 5 4
2 SLA 5 5
3 AC 2 4
4 SLA 5 5
5 AC 3 4
6 SLA 4 3
Then I have 2 functions to build new columns for this df:
def func1():
df['r1']=df['test'] 1
return df['r1']
def func2():
df['r2']=df['RB'] 1
return df['r2']
After I call these 2 functions:
func1()
func2()
Output:
ValOption RB test r1 r2
0 SLA 4 3 4 5
1 AC 5 4 5 6
2 SLA 5 5 6 6
3 AC 2 4 5 3
4 SLA 5 5 6 6
5 AC 3 4 5 4
6 SLA 4 3 4 5
But when I tried to use multiprocessing I can't get the new columns:
import multiprocessing
if __name__ == '__main__':
p1 = multiprocessing.Process(target=func1)
p2 = multiprocessing.Process(target=func2)
p1.start()
p2.start()
p1.join()
p2.join()
Output:
ValOption RB test
0 SLA 4 3
1 AC 5 4
2 SLA 5 5
3 AC 2 4
4 SLA 5 5
5 AC 3 4
6 SLA 4 3
The multiprocessing didn't return the values in the functions .Any friend can help?
CodePudding user response:
If you use a multiprocessing.Pool
and rewrite your function to be more generic, you can use to map an input to an output:
>>> def func(series):
... return series 1
...
>>> with multiprocessing.Pool(2) as p:
... dat = p.map(func, [df['test'].rename('r1'), df['RB'].rename('r2')])
...
Then outside of parallel processing, modify the dataframe with the obtained results, using e.g. df.join()
:
>>> df.join(dat)
ValOption RB test r1 r2
0 SLA 4 3 4 5
1 AC 5 4 5 6
2 SLA 5 5 6 6
3 AC 2 4 5 3
4 SLA 5 5 6 6
5 AC 3 4 5 4
6 SLA 4 3 4 5
Otherwise your best bet to get results would be a task queue, see the example at the bottom of the multiprocessing page. Again, you want to do the computations in the tasks, but not modify any shared data structure. After they execute you can join them together again.
More complex solutions would likely have to resort to multiprocessing.Manager
subclasses, as pandas series and dataframes are complex objects not suited for the multiprocessing.sharedctypes.*
options.
CodePudding user response:
ok, then change your code by creating a class :
from multiprocessing import Process
class Test:
def __init__(self, df):
self.df = df
def func1(self):
df['r1'] = df['test'] 1
def func2(self):
df['r2'] = df['RB'] 1
p1 = Process(target=Test(df).func1())
p2 = Process(target=Test(df).func2())
p1.start()
p2.start()
p1.join()
p2.join()
This should work, for sure
CodePudding user response:
Pandas dataframes are definitely not thread safe. Think about what would happen if you were halfway through func1 when func2 finished! (And pandas is definitely not atomic).
Fortunately multiprocessing has just copied the variable and worked on the copy (actually it has serialised the variable and sent it to the child process). So if you want to do work in multiprocessing, you adopt this workflow:
- break task into steps
- worker functions take steps and compute results
- compile results and apply them back to the object
Have a look at some tutorials for multiprocessing.Pool
to see how this is done.
CodePudding user response:
I am guessing you're using a notebook and you're trying the cell containing if __name__ == '__main__':
?
if so, just run the function outside it - like that :
import multiprocessing
p1 = multiprocessing.Process(target=func1)
p2 = multiprocessing.Process(target=func2)
p1.start()
p2.start()
p1.join()
p2.join()
Or keep it but in this case, execute it as a python file.