Home > Enterprise >  How to write to a shared dictionary faster in Python?
How to write to a shared dictionary faster in Python?

Time:06-10

I am processing some documents and would like to add the results of the documents to a shared dictionary. I am using python multiprocessing manager to create a shared dict across processes and then write to it. But the multiprocessing method is 27x slower than the single core one.

def pooler(self, payload):
        lis_docs=[]
        for keys in payload.keys():
            lis_docs.append([keys,payload[keys]])

        # parallel process on this
        manager=mp.Manager()
        processed_docs=manager.dict()
        pool=mp.Pool(os.cpu_count()-1)
        for doc in lis_docs:
            pool.apply_async(parallel_process_for_shortq,args=(self,doc,processed_docs))
        
        pool.close()
        pool.join()

def parallel_process_for_shortq(self,doc,processed_doc):
        sentences=tokenize_sentences(doc[1])
        modified_text=" ".join(sentences)

        keywords=get_keywords(self.nlp,modified_text,4,self.s2v,self.fdist,self.normalized_levenshtein,len(sentences))
        keyword_sentence_mapping = get_sentences_for_keyword(keywords, sentences)

        processed_doc[doc[0]]=[keywords,keyword_sentence_mapping]

I have to process documents in lis_docs and then write the results to a shared dictionary processed_docs. A document comes in the list only hence, there is no overwriting of data amongst processes.

Is there a way to pool results from multiple processes and then write to a dictionary?

Update: I used time.time() to measure elapsed time for the pool.apply_async call. If I do not use parallel processing, the elapsed time is 1.76seconds and if I use pool.apply_async then it increases to 27.50 seconds

There are only three documents in my lis_docs. I have to process around 1000 documents per batch.

I read that sharing a dictionary is slower so, rather than sharing a dictionary, I tried to pool results from the parallel_process_for_shortq using starmap.

def pooler(self, payload):
        lis_docs=[]
        for keys in payload.keys():
            lis_docs.append([keys,payload[keys]])

        manager=mp.Manager()
        n_cores=os.cpu_count()
        print(n_cores)
        start=time.time()

        with mp.Pool(n_cores-1) as pool:
            res=pool.starmap(parallel_process_for_shortq,zip(repeat(self),lis_docs))
        
        pool.close()
        pool.join()

def parallel_process_for_shortq(self,doc):
        sentences=tokenize_sentences(doc[1])
        modified_text=" ".join(sentences)

        keywords=get_keywords(self.nlp,modified_text,4,self.s2v,self.fdist,self.normalized_levenshtein,len(sentences))
        keyword_sentence_mapping = get_sentences_for_keyword(keywords, sentences)

        return [doc[0],keywords,keyword_sentence_mapping]

But this also takes 27seconds to process.

Edit2: On processing 900 documents at once, there's a significant speed up. Now, the processing time is 1.7s seconds per document but still slower than single processor time of 0.6seconds

CodePudding user response:

Change chunksize in the pool.map. Lower chunksize for tasks when a single task isn't intensive enough leads to a huge drop in performance. I was able to get a speed up of 15x on 1000docs parallel processing on 20 threads.

CodePudding user response:

Since no process needs to read keys or values from the dictionary, I don't see why parallel_process_for_shortq doesn't just return the key and the value to be stored with the key as a tuple back to the main process, which can then assemble the dictionary from these tuples.

As an aside, I don't get what the self arguments are supposed to be for since it is not defined nor do you seem to be using classes. See How to create a Minimal, Reproducible Example.

def pooler(self, payload):
        lis_docs=[]
        for keys in payload.keys():
            lis_docs.append([keys,payload[keys]])

        # parallel process on this
        pool=mp.Pool(os.cpu_count()) # Use all processors
        results = pool.map(parallel_process_for_shortq, args=(self, doc))
        lis_docs = {k: v for k, v in results}
        pool.close()
        pool.join()

def parallel_process_for_shortq(self,doc,processed_doc):
        sentences=tokenize_sentences(doc[1])
        modified_text=" ".join(sentences)

        keywords=get_keywords(self.nlp,modified_text,4,self.s2v,self.fdist,self.normalized_levenshtein,len(sentences))
        keyword_sentence_mapping = get_sentences_for_keyword(keywords, sentences)
        # return key and value
        return doc[0], [keywords,keyword_sentence_mapping]
  • Related