Home > Enterprise >  How to fix concurrency issue with multiprocessing?
How to fix concurrency issue with multiprocessing?

Time:12-03

Serial function working differently than similarly designed parallel function

def update(sharedDict,node,...):
for neighbor in weightNode:
    sharedDict[neighbor]  = *positive float(0->1)
sharedDict[node]  = *positive float(0->1)

This is the slave function of each process. Each addition is a positive float unaffected by any value in sharedDict is added. No keys are added.

def pSerial(graph):
... #setup left out for readability
for i in range(100):
    last_serialDict = serialDict
    serialDict = dict.fromkeys(last_serialDict.keys(),0.0)

    s = *positive float(0->1) #used in later *positive float
    for node in serialDict:
        for neighbor in wDigraph[node]:
            serialDict[neighbor]  = *positive float(0->1)
        serialDict[node]  = *positive float(0->1)
    
    err = sum([abs(serialDict[node] - last_serialDict[node]) for node in serialDict])
    if err < nodeCount * 0.000001:
        return serialDict
raise RuntimeError('failed to converge in 100 iterations')

This is the serial implementation of the algorithm. Note that the slave function is identical to the nested for loop.

def pParallel(graph):
... #setup left out for readability
with Manager() as manager:
    parallelDict = dict(dict.fromkeys(wDigraph, 1.0 / nodeCount)) #from weighted graph
    
    for i in range(100):
        lastParallel = parallelDict
        parallelDict = dict.fromkeys(lastParallel.keys(),0.0)
    
        s = *positive float(0->1)
    
        pool = Pool() 
        sharedDict = manager.dict(parallelDict)
        pool.starmap(update, [(sharedDict,node,...) for node in parallelDict])
        
        pool.close()
        pool.join()
        
        parallelDict = dict(sharedDict)

        err = sum([abs(parallelDict[node] - lastParallel[node]) for node in parallelDict])
        if err < nodeCount * 0.000001:
            return parallelDict
raise RuntimeError('failed to converge in 100 iterations')

With this function computing the PageRank of variable size graphs in parallel, the update function not only is slower than the serial version(programmed the same way) but also does not converge in the same amount of iterations(concurrency issue)

What is the issue with the code that is causing this?

CodePudding user response:

It is not safe to update the same dict in parallel from multiple processes/threads. This cause a race condition. The threads needs to write in different places so to avoid this (they can read the same part safely though). Adding new key or removing existing ones to the same shared dict also cause a race condition due to the key that needs to be possibly re-hashed internally.

  • Related