I'm implementing a "gather and go" feature. I need a batch of threads to work simultaneously. It may take a while for each threading to prepare and then become ready. I want them to start at the same time once all of them are ready.
Event object in threading module can be used for it. For example I can create an event in main thread, then wait() for it in the worker threads, then set() the event in main thread. But how can I know from main thread that all worker threads are ready? Should I use event either? For example create an event for each worker thread?
Code example
from threading import Thread, Event, Lock, current_thread
from datetime import datetime
from time import sleep
from random import random
log_lock = Lock()
def log(msg):
with log_lock:
print("{} ({}) {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), current_thread().name, msg))
def worker(worker_ready_event, go_event):
log("Initializing worker")
sleep(2 random() * 3)
worker_ready_event.set()
log("Worker is ready")
go_event.wait()
log("Worker is working")
log("Worker has finished their work")
def main():
go_event = Event()
event_list = list()
for i in range(1, 6):
worker_ready_event = Event()
event_list.append(worker_ready_event)
Thread(name=f"Worker-{i}", target=worker, args=(worker_ready_event, go_event)).start()
for worker_ready_event in event_list:
worker_ready_event.wait()
log("All workers are ready, Go!")
go_event.set()
if __name__ == '__main__':
main()
Is the above code the correct way of implement it? Or is there any exiting tool for this "gather and go"? And one more question: how to you call this scenario? "Gather and go", or something else?
CodePudding user response:
threading.Barrier
is built for this. Initialize it with the number of threads that you want to synchronize. Each thread calls wait
which will wait until all threads have done the same. That final wait
releases all of them to run at once.
from threading import Thread, Lock, current_thread, Barrier
from datetime import datetime
from time import sleep
from random import random
log_lock = Lock()
def log(msg):
with log_lock:
print("{} ({}) {}".format(datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f"), current_thread().name, msg))
def worker(barrier):
log("Initializing worker")
sleep(2 random() * 3)
log("Worker is ready")
barrier.wait()
log("Worker is working")
sleep(.1)
log("Worker has finished their work")
def main():
num_threads = 5
work_barrier = Barrier(num_threads 1)
for i in range(1, num_threads 1):
Thread(name=f"Worker-{i}", target=worker, args=(work_barrier,)).start()
log("main waits")
work_barrier.wait() # waits til all workers are ready
log("All workers are ready") # this thread starts w/ the workers
if __name__ == '__main__':
main()