I am processing some documents and would like to add the results of the documents to a shared dictionary. I am using python multiprocessing manager to create a shared dict across processes and then write to it. But the multiprocessing method is 27x slower than the single core one.
def pooler(self, payload):
lis_docs=[]
for keys in payload.keys():
lis_docs.append([keys,payload[keys]])
# parallel process on this
manager=mp.Manager()
processed_docs=manager.dict()
pool=mp.Pool(os.cpu_count()-1)
for doc in lis_docs:
pool.apply_async(parallel_process_for_shortq,args=(self,doc,processed_docs))
pool.close()
pool.join()
def parallel_process_for_shortq(self,doc,processed_doc):
sentences=tokenize_sentences(doc[1])
modified_text=" ".join(sentences)
keywords=get_keywords(self.nlp,modified_text,4,self.s2v,self.fdist,self.normalized_levenshtein,len(sentences))
keyword_sentence_mapping = get_sentences_for_keyword(keywords, sentences)
processed_doc[doc[0]]=[keywords,keyword_sentence_mapping]
I have to process documents in lis_docs
and then write the results to a shared dictionary processed_docs
. A document comes in the list only hence, there is no overwriting of data amongst processes.
Is there a way to pool results from multiple processes and then write to a dictionary?
Update: I used time.time() to measure elapsed time for the pool.apply_async call. If I do not use parallel processing, the elapsed time is 1.76seconds and if I use pool.apply_async then it increases to 27.50 seconds
There are only three documents in my lis_docs. I have to process around 1000 documents per batch.
I read that sharing a dictionary is slower so, rather than sharing a dictionary, I tried to pool results from the parallel_process_for_shortq
using starmap.
def pooler(self, payload):
lis_docs=[]
for keys in payload.keys():
lis_docs.append([keys,payload[keys]])
manager=mp.Manager()
n_cores=os.cpu_count()
print(n_cores)
start=time.time()
with mp.Pool(n_cores-1) as pool:
res=pool.starmap(parallel_process_for_shortq,zip(repeat(self),lis_docs))
pool.close()
pool.join()
def parallel_process_for_shortq(self,doc):
sentences=tokenize_sentences(doc[1])
modified_text=" ".join(sentences)
keywords=get_keywords(self.nlp,modified_text,4,self.s2v,self.fdist,self.normalized_levenshtein,len(sentences))
keyword_sentence_mapping = get_sentences_for_keyword(keywords, sentences)
return [doc[0],keywords,keyword_sentence_mapping]
But this also takes 27seconds to process.
Edit2: On processing 900 documents at once, there's a significant speed up. Now, the processing time is 1.7s seconds per document but still slower than single processor time of 0.6seconds
CodePudding user response:
Change chunksize in the pool.map. Lower chunksize for tasks when a single task isn't intensive enough leads to a huge drop in performance. I was able to get a speed up of 15x on 1000docs parallel processing on 20 threads.
CodePudding user response:
Since no process needs to read keys or values from the dictionary, I don't see why parallel_process_for_shortq
doesn't just return the key and the value to be stored with the key as a tuple back to the main process, which can then assemble the dictionary from these tuples.
As an aside, I don't get what the self
arguments are supposed to be for since it is not defined nor do you seem to be using classes. See How to create a Minimal, Reproducible Example.
def pooler(self, payload):
lis_docs=[]
for keys in payload.keys():
lis_docs.append([keys,payload[keys]])
# parallel process on this
pool=mp.Pool(os.cpu_count()) # Use all processors
results = pool.map(parallel_process_for_shortq, args=(self, doc))
lis_docs = {k: v for k, v in results}
pool.close()
pool.join()
def parallel_process_for_shortq(self,doc,processed_doc):
sentences=tokenize_sentences(doc[1])
modified_text=" ".join(sentences)
keywords=get_keywords(self.nlp,modified_text,4,self.s2v,self.fdist,self.normalized_levenshtein,len(sentences))
keyword_sentence_mapping = get_sentences_for_keyword(keywords, sentences)
# return key and value
return doc[0], [keywords,keyword_sentence_mapping]