Home > database >  Using Python flock() across multiple processes
Using Python flock() across multiple processes

Time:05-27

I have some legacy code that needs to access the same data file across multiple threads and processes. I am trying to implement locking to protect the data.

Multithreaded

import contextlib
import threading

FILE_PATH = "foo.txt"

USE_GLOBAL_LOCK = False

if USE_GLOBAL_LOCK:
    global_lock = threading.Lock()
else:
    global_lock = contextlib.nullcontext()

def do_write_then_read(results) -> None:
    # Write to disk
    data = "FOO"
    with global_lock:
        with open(FILE_PATH, "w") as f:
            f.write(data)

    # Read from disk
    data = None
    with global_lock:
        with open(FILE_PATH, "r") as f:
            data = f.read()
    results.append(data)

def run_multithreaded() -> None:
    results = []

    threads = []
    for _ in range(10):
        threads.append(threading.Thread(target=do_write_then_read, args=[results]))
    for t in threads:
        t.start()
    for t in threads:
        t.join()

    print(results)

The output is usually correct:

 ['FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']

but sometimes there is missing data. I believe this is due to a race condition between the read() and the write(), which initially truncates the file:

 ['', '', 'FOO', '', '', 'FOO', '', 'FOO', 'FOO', 'FOO']`

Setting USE_GLOBAL_LOCK to protect the file access using a threading.Lock indeed fixes the problem.

Multithreaded and multiprocess

However, running this across multiple processes again results in missing data.

Here's some test code that forks subprocesses that each invoke the above run_multithreaded() method.

import fcntl
import subprocess

def run_multiprocess() -> None:
    processes = []
    for _ in range(3):
        CMD = "python3 -c 'import foo; foo.run_multithreaded()'"
        processes.append(subprocess.Popen(CMD, shell=True))
    for p in processes:
        p.wait()

Output with missing data:

['', '', 'FOO', '', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']
['', '', '', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']
['FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO', 'FOO']

Here we redefine do_write_then_read() to add filesystem-based locks (flock) so that the file can be locked across multiple processes:

def do_write_then_read(results) -> None:
    # write
    data = "FOO"
    with global_lock:
        with open(FILE_PATH, "w") as f:
            # Acquire file lock
            fd = f.fileno()
            fcntl.flock(fd, fcntl.LOCK_EX)

            f.write(data)
            # Exiting the context closes file and releases lock

    # read
    data = None
    with global_lock:
        with open(FILE_PATH, "r") as f:
            # Acquire file lock
            fd = f.fileno()
            fcntl.flock(fd, fcntl.LOCK_EX)

            data = f.read()
            # Exiting the context closes file and releases lock
    results.append(data)

However, this doesn't fix the problem, and I can't figure out why, no matter what I try :P

I'm on Mac / Linux with Python 3.9.

CodePudding user response:

As VPfB noted, you have to postpone the truncation until after you acquire the lock. You can just replace threading.Lock with multiprocessing.Lock. Depending on how the processes get spawned, sharing a lock may be more or less easy.

Alternatively, you could use a separate lock file.

with open(FILE_PATH   ".lock", "w") as lockfile:
    fcntl.flock(lockfile.fileno(), fcntl.LOCK_EX)
    with open(FILE_PATH, "w") as f:
        f.write(data)

with open(FILE_PATH   ".lock", "w") as lockfile:
    fcntl.flock(lockfile.fileno(), fcntl.LOCK_SH)
    with open(FILE_PATH, "r") as f:
        data = f.read()

There is no good way to remove the lock file unless you know all processes have left this code section. We could come up with more complicated schemes but at that point switching to the multiprocessing module is probably easier.

CodePudding user response:

The problem is that the global locks are shared in multiprocessing in all threads (i.e. the threads will literally access the same lock across the multiple threads), but you have to explicitly pass them to the processes in multiprocessing, as you'll end up with copies here that are otherwise not connected and live in different process spaces.

See this example from the docs and for more information on that issue.

  • Related