Home > Net >  "When" loop in asynchronous Python
"When" loop in asynchronous Python

Time:09-28

This isn't as much of a question as something I'm interested in.

I do quite a bit of asynchronous coding in Python, and there's a bit of code that I'm frequently writing over and over while I'm waiting for threads to stop (if I'm trying to exit cleanly).

while not class_containing_threads.stopped:
    pass
else:
     do_something()
     do_something_else()
     do_some_other_thing()

Although I'm sure there's a nice decorator that one can write to make this happen, I'm not too sure how I would go about writing it without ultimately making my code more complicated than it needs to be.

Basically, I wish there were something along the lines of:

when condition:
    do_something()

where the thread is effectively halted while we wait for some event to occur.

To demonstrate what I mean, here's some working code that shows how much I actually end up writing the same thing over and over

import threading
import random
import time


class ClassContainingThreads:
    def __init__(self):
        # Just stating what stuff can be found here
        self._coordinating_thread = None
        self._worker_thread_1 = None
        self._worker_thread_2 = None
        self._worker_thread_3 = None
        self._stopping = False
        self._stopped = False

    def run(self):
        # Main method to get everything running
        self._coordinating_thread = threading.Thread(target=self._run)
        self._coordinating_thread.start()

    def stop(self):
        # Used to stop everything
        self._stopping = True

    @property
    def stopped(self):
        # Lets you know when things have stopped
        return self._stopped

    @property
    def all_workers_running(self):
        # Lets you know whether all the workers are running
        return self._all_workers_are_alive()

    def _run(self):
        # Coordinating thread getting worker threads to start
        self._worker_thread_1 = threading.Thread(
            target=self._important_function_1)
        self._worker_thread_2 = threading.Thread(
            target=self._important_function_2)
        self._worker_thread_3 = threading.Thread(
            target=self._important_function_3)
        self._worker_thread_1.start()
        self._worker_thread_2.start()
        self._worker_thread_3.start()
        # Coincidentally, the block appears here
        while not self._stopping:
            pass
        else:
            while self._any_workers_are_alive():
                pass
            else:
                self._stopping = False
                self._stopped = True

    def _important_function_1(self):
        print(f'Thread 1 started')
        # Coincidentally, the block appears here
        while not self._stopping:
            pass
        else:
            print('Thread 1 received stop signal')
            # Emulating some process that takes some unknown time to stop
            delay_long = random.random() * 5
            delay_start = time.time()
            while not (time.time() - delay_start) > delay_long:
                pass
            else:
                print(f'Thread 1 stopped')

    def _important_function_2(self):
        print(f'Thread 2 started')
        # Coincidentally, the block appears here
        while not self._stopping:
            pass
        else:
            print('Thread 2 received stop signal')
            # Emulating some process that takes some unknown time to stop
            delay = random.random() * 5
            delay_start = time.time()
            while not (time.time() - delay_start) > delay:
                pass
            else:
                print(f'Thread 2 stopped')

    def _important_function_3(self):
        print(f'Thread 3 started')
        # Coincidentally, the block appears here
        while not self._stopping:
            pass
        else:
            print('Thread 3 received stop signal')
            # Emulating some process that takes some unknown time to stop
            delay = random.random() * 5
            delay_start = time.time()
            while not (time.time() - delay_start) > delay:
                pass
            else:
                print(f'Thread 3 stopped')

    def _any_workers_are_alive(self):
        # Check whether any workers are alive
        if (self._worker_thread_1.is_alive() or
                self._worker_thread_2.is_alive() or
                self._worker_thread_3.is_alive()):
            return True
        else:
            return False

    def _all_workers_are_alive(self):
        # Check whether all workers are alive
        if (self._worker_thread_1.is_alive() and
                self._worker_thread_2.is_alive() and
                self._worker_thread_3.is_alive()):
            return True
        else:
            return False


if __name__ == '__main__':
    # Just booting everything up
    print('Program started')
    class_containing_threads = ClassContainingThreads()
    class_containing_threads.run()
    # Block I'm interested in appears here
    while not class_containing_threads.all_workers_running:
        pass
    else:
        # and here
        while not input("Type 'exit' to exit > ") == "exit":
            pass
        else:
            class_containing_threads.stop()
            # and here
            while not class_containing_threads.stopped:
                pass
            else:
                print('Program stopped')
                exit()  # I know this is pointless here

Also, critiques are welcome.

CodePudding user response:

The pattern of repeatedly checking a flag is a form of busy wait. This is an extremely wasteful pattern, as the task checking the flag will do so very, very often.

Concrete alternatives depend on the concurrency pattern used, but usually come in the form of signals, events or locks – these are generally known as "synchronisation primitives".

For example, threading provides a threading.Event that can be "waited for" and "triggered". The desired operation when condition: is simply event.wait() – this automatically pauses the current thread until the event is triggered. Another thread can trigger this condition via event.set().

CodePudding user response:

Thanks to the feedback, I've rewritten the code snippet into something that uses the threading.Thread.join() method and threading.Event object. It's much simpler now, and hopefully doesn't involve any unintentional busy waiting.

import threading
import random
import time


class ClassContainingThreads:
    def __init__(self, blocking_event):
        # Just stating what stuff can be found here
        self._blocking_event = blocking_event
        self._coordinating_thread = None
        self._worker_thread_1 = None
        self._worker_thread_2 = None
        self._worker_thread_3 = None
        self._stopping = False
        self._stopped = False

    def run(self):
        # Main method to get everything running
        self._coordinating_thread = threading.Thread(target=self._run)
        self._coordinating_thread.start()

    def stop(self):
        # Used to stop everything
        self._stopping = True

    @property
    def stopped(self):
        return self._stopped

    def _run(self):
        # Coordinating thread getting worker threads to start
        self._worker_thread_1 = threading.Thread(
            target=self._important_function_1)
        self._worker_thread_2 = threading.Thread(
            target=self._important_function_2)
        self._worker_thread_3 = threading.Thread(
            target=self._important_function_3)

        # Start the workers
        self._worker_thread_1.start()
        self._worker_thread_2.start()
        self._worker_thread_3.start()

        # Let main_thread continue when workers have started
        self._blocking_event.set()

        # Wait for workers to complete
        self._worker_thread_1.join()
        self._worker_thread_2.join()
        self._worker_thread_3.join()

        # Once all threads are dead
        self._stopping = False
        self._stopped = True
        self._blocking_event.set()

    def _important_function_1(self):
        print(f'Thread 1 started')
        # Emulating some job being done
        while not self._stopping:
            pass
        else:
            print('Thread 1 received stop signal')
            # Emulating some process that takes some unknown time to stop
            delay_long = random.random() * 5
            delay_start = time.time()
            while not (time.time() - delay_start) > delay_long:
                pass
            else:
                print(f'Thread 1 stopped')

    def _important_function_2(self):
        print('Thread 2 started')
        # Emulating some job being done
        while not self._stopping:
            pass
        else:
            print('Thread 2 received stop signal')
            # Emulating some process that takes some unknown time to stop
            delay = random.random() * 5
            delay_start = time.time()
            while not (time.time() - delay_start) > delay:
                pass
            else:
                print(f'Thread 2 stopped')

    def _important_function_3(self):
        print('Thread 3 started')
        # Emulating some job being done
        while not self._stopping:
            pass
        else:
            print('Thread 3 received stop signal')
            # Emulating some process that takes some unknown time to stop
            delay = random.random() * 5
            delay_start = time.time()
            while not (time.time() - delay_start) > delay:
                pass
            else:
                print(f'Thread 3 stopped')


if __name__ == '__main__':
    # Just booting everything up
    print('Program started')
    blocking_event = threading.Event()
    class_containing_threads = ClassContainingThreads(blocking_event)
    class_containing_threads.run()
    blocking_event.wait()
    while not input("Type 'exit' to exit > ") == "exit":
        pass
    else:
        class_containing_threads.stop()
        blocking_event.wait()
        print('Program stopped')
        exit()  # I know this is pointless here
  • Related