I have some legacy code that needs to access the same data file across multiple threads and processes. I am trying to implement locking to protect the data.
Multithreaded
import contextlib
import threading
FILE_PATH = "foo.txt"
USE_GLOBAL_LOCK = False
if USE_GLOBAL_LOCK:
global_lock = threading.Lock()
else:
global_lock = contextlib.nullcontext()
def do_write_then_read(results) -> None:
# Write to disk
data = "FOO"
with global_lock:
with open(FILE_PATH, "w") as f:
f.write(data)
# Read from disk
data = None
with global_lock:
with open(FILE_PATH, "r") as f:
data = f.read()
results.append(data)
def run_multithreaded() -> None:
results = []
threads = []
for _ in range(10):
threads.append(threading.Thread(target=do_write_then_read, args=[results]))
for t in threads:
t.start()
for t in threads:
t.join()
print(results)
The output is usually correct:
['FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']
but sometimes there is missing data. I believe this is due to a race condition between the read()
and the write()
, which initially truncates the file:
['', '', 'FOO', '', '', 'FOO', '', 'FOO', 'FOO', 'FOO']`
Setting USE_GLOBAL_LOCK
to protect the file access using a threading.Lock
indeed fixes the problem.
Multithreaded and multiprocess
However, running this across multiple processes again results in missing data.
Here's some test code that forks subprocesses that each invoke the above run_multithreaded()
method.
import fcntl
import subprocess
def run_multiprocess() -> None:
processes = []
for _ in range(3):
CMD = "python3 -c 'import foo; foo.run_multithreaded()'"
processes.append(subprocess.Popen(CMD, shell=True))
for p in processes:
p.wait()
Output with missing data:
['', '', 'FOO', '', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']
['', '', '', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']
['FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']
Here we redefine do_write_then_read()
to add filesystem-based locks (flock
) so that the file can be locked across multiple processes:
def do_write_then_read(results) -> None:
# write
data = "FOO"
with global_lock:
with open(FILE_PATH, "w") as f:
# Acquire file lock
fd = f.fileno()
fcntl.flock(fd, fcntl.LOCK_EX)
f.write(data)
# Exiting the context closes file and releases lock
# read
data = None
with global_lock:
with open(FILE_PATH, "r") as f:
# Acquire file lock
fd = f.fileno()
fcntl.flock(fd, fcntl.LOCK_EX)
data = f.read()
# Exiting the context closes file and releases lock
results.append(data)
However, this doesn't fix the problem, and I can't figure out why, no matter what I try :P
I'm on Mac / Linux with Python 3.9.
CodePudding user response:
As VPfB noted, you have to postpone the truncation until after you acquire the lock. You can just replace threading.Lock
with multiprocessing.Lock
. Depending on how the processes get spawned, sharing a lock may be more or less easy.
Alternatively, you could use a separate lock file.
with open(FILE_PATH ".lock", "w") as lockfile:
fcntl.flock(lockfile.fileno(), fcntl.LOCK_EX)
with open(FILE_PATH, "w") as f:
f.write(data)
with open(FILE_PATH ".lock", "w") as lockfile:
fcntl.flock(lockfile.fileno(), fcntl.LOCK_SH)
with open(FILE_PATH, "r") as f:
data = f.read()
There is no good way to remove the lock file unless you know all processes have left this code section. We could come up with more complicated schemes but at that point switching to the multiprocessing module is probably easier.
CodePudding user response:
The problem is that the global locks are shared in multiprocessing in all threads (i.e. the threads will literally access the same lock across the multiple threads), but you have to explicitly pass them to the processes in multiprocessing, as you'll end up with copies here that are otherwise not connected and live in different process spaces.
See this example from the docs and for more information on that issue.