Home > database >  It this the correct way of implement "gather and go" in Python multiple threading? And how
It this the correct way of implement "gather and go" in Python multiple threading? And how

Time:07-04

I'm implementing a "gather and go" feature. I need a batch of threads to work simultaneously. It may take a while for each threading to prepare and then become ready. I want them to start at the same time once all of them are ready.

Event object in threading module can be used for it. For example I can create an event in main thread, then wait() for it in the worker threads, then set() the event in main thread. But how can I know from main thread that all worker threads are ready? Should I use event either? For example create an event for each worker thread?

Code example

from threading import Thread, Event, Lock, current_thread
from datetime import datetime
from time import sleep
from random import random

log_lock = Lock()


def log(msg):
    with log_lock:
        print("{} ({}) {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), current_thread().name, msg))


def worker(worker_ready_event, go_event):
    log("Initializing worker")
    sleep(2   random() * 3)
    worker_ready_event.set()
    log("Worker is ready")
    go_event.wait()
    log("Worker is working")
    log("Worker has finished their work")


def main():
    go_event = Event()
    event_list = list()
    for i in range(1, 6):
        worker_ready_event = Event()
        event_list.append(worker_ready_event)
        Thread(name=f"Worker-{i}", target=worker, args=(worker_ready_event, go_event)).start()

    for worker_ready_event in event_list:
        worker_ready_event.wait()
    log("All workers are ready, Go!")
    go_event.set()


if __name__ == '__main__':
    main()

Is the above code the correct way of implement it? Or is there any exiting tool for this "gather and go"? And one more question: how to you call this scenario? "Gather and go", or something else?

CodePudding user response:

threading.Barrier is built for this. Initialize it with the number of threads that you want to synchronize. Each thread calls wait which will wait until all threads have done the same. That final wait releases all of them to run at once.

from threading import Thread, Lock, current_thread, Barrier
from datetime import datetime
from time import sleep
from random import random

log_lock = Lock()


def log(msg):
    with log_lock:
        print("{} ({}) {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), current_thread().name, msg))


def worker(barrier):
    log("Initializing worker")
    sleep(2   random() * 3)
    log("Worker is ready")
    barrier.wait()
    log("Worker is working")
    sleep(.1)
    log("Worker has finished their work")


def main():
    num_threads = 5
    work_barrier = Barrier(num_threads 1)
    for i in range(1, num_threads 1):
        Thread(name=f"Worker-{i}", target=worker, args=(work_barrier,)).start()
    log("main waits")
    work_barrier.wait() # waits til all workers are ready
    log("All workers are ready") # this thread starts w/ the workers

if __name__ == '__main__':
    main()
  • Related