I am trying to create a multiprocessing-based program that has a file cache to speed things up. This cache is empty at the start of the program but then is filled out as requests to the data are made. There is also an extra set of files which are the unprocessed versions of the files loaded into the cache. The multiprocessed code I'm using looks like this:
# file_caches is a multiprocessing.Manager.dict()
# file_cache_lock is a multiprocessing.Lock()
if file_path in file_caches:
# We have a cache
file_cache_lock.acquire()
cached = file_caches[file_path][:]
file_cache_lock.release()
data1 = cached[0]
data2 = cached[1]
elif file_path.exists():
data1 = np.load(file_path)
data2 = get_data2()
if file_cache_lock.acquire(False) and (file_path not in file_caches): # Non-blocking acquire
file_caches[file_path] = (data1, data2)
file_cache_lock.release()
else:
# Load original file
data1, data2 = read_and_process(original_file_path)
# save data
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "wb") as f:
np.save(f, data1, allow_pickle=False)
if file_cache_lock.acquire(False) and (file_path not in file_caches): # Non-blocking acquire
file_caches[file_path] = (data1, data2)
file_cache_lock.release()
However, this could result in a race condition if two (or more) processes try to get the same file a little bit apart.
Say Process A goes to run this code, and sees that there is no cache and the file to be cached hasn't been created yet, so it goes to process the original file, and create the backing file to be cached. Process B comes along right after Process A creates the file but before it finishes writing. Process B will end up in the elif
case, where it will start reading incompletely written data. Obviously, this is a problem.
So, I'd like to add an extra field to the tuple in the cache dictionary that is a multiprocessing.Lock(), so I'm not holding up other data being written and read, but while also preventing a race condition. This is not so simple however, as I get the error:
Lock objects should only be shared between processes through inheritance
So, is there a way to dynamically create locks to add to the dictionary in this way? Or is there a better way to solve this problem?
CodePudding user response:
I would use a queue instead of your dictionary. Your processes read their tasks from the same queue. At the beginning, you fill the queue with all the unprocessed files. These get processed and after that's done the active process put's the processed file name in the same queue again. As the queue is emptied sequentially, you will never have a race condition on incomplete data.
In pseudo-code:
def input_polling(in_queue):
# polls the input queue and stops when "STOP" is send
# will block until element becomes available
for a in iter(in_queue.get, 'STOP'):
if a == unprocessed:
process(a)
in_queue.put(a_processed)
if a == processed:
process(a)
def main(args):
in_queue = mp.Queue()
for n in range(4):
inThread = multiprocessing.Process(target=input_polling,args=[in_queue])
inThread.start()
for element in list_unprocessed_files:
in_queue.put(element)
After creation and starting of your processes they are idle until something is put into the queue. The processes can later be stopped by putting "STOP" into the queue.
CodePudding user response:
My first observation is that yo udo not need to acquire the lock to test whether a file path is in the cache and if so get the value(s) (see my second code version).
But the simplest (not necessarily the best) option to avoid the race condition would be to do all the caching logic only after acquiring the lock as follows (but there is a better option in the second version):
def worker(file_path, original_file_path, file_caches, file_cache_lock):
with file_cache_lock:
if file_path in file_caches:
# Found in cache!
data1, data2 = file_caches[file_path]
elif file_path.exists():
data1 = np.load(file_path)
data2 = get_data2()
file_caches[file_path] = (data1, data2)
else:
# Load original file
data1, data2 = read_and_process(original_file_path)
# save data
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "wb") as f:
np.save(f, data1, allow_pickle=False)
file_caches[file_path] = (data1, data2)
file_cache_lock.release()
... # rest of code that uses data1 and data2 omitted
You may be concerned that if the data is not already in the cache, then you will be holding on to the lock reading and possibly writing multiple files. So if you want to not hold up other processes that may be trying to acquire the lock at the risk that you may be doing some unnecessary file I/O, then the following code does minimal locking of the cache. Ultimately, the only locking required is when a process is writing the file to file_path
or when a process is loading the file from file_path
to avoid reading a partially created file:
def worker(file_path, original_file_path, file_caches, file_cache_lock):
if file_path in file_caches:
# Found in cache!
data1, data2 = file_caches[file_path]
elif file_path.exists():
# Now we must acquire the lock in case the file is being written:
with file_cache_lock:
# Check one more time to see if loading is still
# necessary:
if file_path in file_caches:
# Another process has created the cache entry:
data1, data2 = file_caches[file_path]
else:
data1 = np.load(file_path)
data2 = get_data2()
file_caches[file_path] = (data1, data2)
else:
# Load original file
data1, data2 = read_and_process(original_file_path)
# Did someone else create the cache entry in the meanwhile?
# Now we must acquire the lock:
with file_cache_lock:
# Check one more time to see if write is still necessary:
if not file_path in file_caches:
# It's okay to update the cache now:
file_caches[file_path] = (data1, data2)
file_path.parent.mkdir(parents=True, exist_ok=True)
with open(file_path, "wb") as f:
np.save(f, data1, allow_pickle=False)
... # rest of code that uses data1 and data2 omitted