I wanted to create a custom Thread class that is able to propagate an exception it comes across to the main thread. My implementation is as follows:
class VerseThread(threading.Thread):
def __init__(self, args):
super().__init__(self, args=args)
# self.scraper = scraper
def run(self):
self.exc = None
try:
book, abbrev, template, chapter = self.args
self.parser.parse(book, abbrev, template, chapter)
except ChapterNotFoundError as e:
self.exc = e
def join(self):
threading.Thread.join(self)
if self.exc:
raise self.exc
This is supposed to run in the following method, inside a Scraper class (it's all inside a ẁhile true
):
for book, abbrev, testament in self.books[init:end]:
base_chapter = 1
while True:
threads = []
if testament == 'ot':
for i in range(3):
threads.append(VerseThread(args=(book, abbrev, OT_TEMPLATE, base_chapter i)))
else:
for i in range(3):
threads.append(VerseThread(args=(book, abbrev, NT_TEMPLATE, base_chapter i)))
try:
for thread in threads:
if not thread.is_alive():
thread.start()
for thread in threads:
thread.join()
base_chapter = 3
except ChapterNotFoundError as e:
LOGGER.info(f"{{PROCESS {multiprocessing.current_process().pid}}} - Chapter {e.chapter} not found in {book}, exiting book...")
break
The issue is, if I run it like presented here, I get the error assert group is None, "group argument must be None for now"
. However, when I run it using Thread(target=self.parse, args=(book, abbrev, OT_TEMPLATE, base_chapter 1))
instead of VerseThread(args=(book, abbrev, OT_TEMPLATE, base_chapter i))
, it works just fine, but the exception is of course still there. What's wrong with my code? How can I get rid of this error?
EDIT: Upon further testing, it seems that what I'm trying to do works fine when I use thread.run()
instead of thread.start()
, but then only one thread is being used, which is a problem. This, however, means that the error must be in the start()
method, but I've no idea what to do.
CodePudding user response:
You have several errors. First, if you are using super()
as in super().__init__(self, target=target, args=args)
, you do not pass self explicitly as an argument. Second, to handle any possible thread-initializer arguments, your signature for this method should just be as follows:
class VerseThread(threading.Thread):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
... # rest of the code omitted
But since your __init__
method does not do anything but call the parent's __init__
method with any passed arguments, there is now no need to even override this method.
Finally, the attributes that you are interested in are not args but rather _args and _kwargs (in case keyword arguments are specified). Also, you have specified self.parser
, but I do not see where that attribute has been set.
import threading
class ChapterNotFoundError(Exception):
pass
class VerseThread(threading.Thread):
def run(self):
self.exc = None
try:
book, abbrev, template, chapter = self._args
self.parser.parse(book, abbrev, template, chapter)
except ChapterNotFoundError as e:
self.exc = e
def join(self):
threading.Thread.join(self) # Or: super().join()
if self.exc:
raise self.exc
for book, abbrev, testament in self.books[init:end]:
base_chapter = 1
while True:
threads = []
if testament == 'ot':
for i in range(3):
threads.append(VerseThread(args=(book, abbrev, OT_TEMPLATE, base_chapter i)))
else:
for i in range(3):
threads.append(VerseThread(args=(book, abbrev, NT_TEMPLATE, base_chapter i)))
try:
for thread in threads:
if not thread.is_alive():
thread.start()
for thread in threads:
thread.join()
base_chapter = 3
except ChapterNotFoundError as e:
LOGGER.info(f"{{PROCESS {multiprocessing.current_process().pid}}} - Chapter {e.chapter} not found in {book}, exiting book...")
break
Improvement
Accessing quasi-private attributes, such as self._args
is a potentially dangerous thing and should be avoided.
I can see the value of creating a subclass of Thread
that will catch exceptions in the "worker" function it is to execute and then "propogate" it back to the main thread when it joins the thread. But I believe such a class should be general purpose and work with any type of worker function. In general, I don't like to have application-specific code (business logic) in a multithreading.Thread
or multiprocessing.Pool
subclass. I instead prefer having my business logic coded within a function or class method(s) that can then be used in multithreading, multiprocessing or serial processing as you see fit. The following is how I would code the Thread
subclass (I have named it PropogateExceptionThread
, but chose whatever name you wish) and I might use it:
import threading
class PropogateExceptionThread(threading.Thread):
def run(self):
self.exc = None
try:
super().run()
except Exception as e:
self.exc = e
def join(self):
super().join()
if self.exc:
raise self.exc
def worker(x):
if x < 10 or x > 20:
raise ValueError(f'Bad value for argument x = {x}')
t = PropogateExceptionThread(target=worker, args=(1,))
t.start()
try:
t.join()
except Exception as e:
print('The thread raised an exception:', e)
Prints:
The thread raised an exception: Bad value for argument x = 1