Home > database >  Python concurrency with concurrent.futures.ThreadPoolExecutor
Python concurrency with concurrent.futures.ThreadPoolExecutor

Time:11-03

Consider the following snippet:

import concurrent.futures
import time
from random import random

class Test(object):
    def __init__(self):
        self.my_set = set()

    def worker(self, name):
        temp_set = set()

        temp_set.add(name)
        temp_set.add(name*10)
        time.sleep(random() * 5)
        temp_set.add(name*10   1)

        self.my_set = self.my_set.union(temp_set) # question 1
        return name

    def start(self):
        result = []
        names = [1,2,3,4,5,6,7]
        with concurrent.futures.ThreadPoolExecutor(max_workers=len(names)) as executor:
            futures = [executor.submit(self.worker, x) for x in names]
            for future in concurrent.futures.as_completed(futures):
                result.append(future.result()) # question 2
  1. Is there a chance self.my_set can become corrupted via the line marked "question 1"? I believe union is atomic, but couldn't the assignment be a problem?

  2. Is there a problem on the line marked "question 2"? I believe the list append is atomic, so perhaps this is ok.

I've read these docs:

https://docs.python.org/3/library/stdtypes.html#set https://web.archive.org/web/20201101025814id_/http://effbot.org/zone/thread-synchronization.htm Is Python variable assignment atomic? https://docs.python.org/3/glossary.html#term-global-interpreter-lock

and executed the snippet code provided in this question, but I can't find a definitive answer to how concurrency should work in this case.

CodePudding user response:

Regarding question 1: Think about what's going on here:

self.my_set = self.my_set.union(temp_set)

There's a sequence of at least three distinct steps

  1. The worker call grabs a copy of self.my_set (a reference to Set object)
  2. The union function constructs a new set.
  3. The worker assigns self.my_set to refer to the newly constructed set.

So what happens if two or more workers concurrently try to do the same thing? (note: it's not guaranteed to happen this way, but it could happen this way.)

  1. Each of them could grab a reference to the original my_set.
  2. Each of them could compute a new set, consisting only of the original members of my_set plus its own contribution.
  3. Each of them could assign its new set to the my_set variable.

The problem is in step three. If it happened this way, then each of those new sets only would contain the contribution from the one worker that created it. There would be no single set containing the new contributions from all of the workers. When it's all over, my_set would only refer to one of those new sets—whichever thread was the last to perform the assignment would "win"—and the other new sets all would be be thrown away.

One way to prevent that would be to use mutual exclusion to keep other threads from trying to compute their new sets and update the shared variable at the same time:

class Test(object):
    def __init__(self):
        self.my_set = set()
        self.my_set_mutex = threading.Lock()

    def worker(self, name):
        ...
        with self.my_set_mutex
            self.my_set = self.my_set.union(temp_set)
        return name

Regarding question 2: It doesn't matter whether or not appending to a list is "atomic." The result variable is local to the start method. In the code that you've shown, the list to which result refers is inaccessible to any other thread than the one that created it. There can't be any interference between threads unless you share the list with other threads.

  • Related