Home > Software design >  Instantiate threading within a class
Instantiate threading within a class

Time:10-01

I have a class within a class and want to activate threading capabilities in the second class. Essentially, the script below is a reproducible template of my proper project.

When I use @threading I get that showit is not iterable, so the tp.map thinks I do not have a list.

However, when I run:

if __name__ == '__main__':
    tp = ThreadPoolExecutor(5)
    print(tp.map(testit(id_str).test_first, id_int))
    for values in tp.map(testit(id_str).test_first, id_int):
        values

I get no issues, besides that I want the expected output to print out each number in the list. However, I wanted to achieve this within the class.

Something like the following:

from concurrent.futures import ThreadPoolExecutor
from typing import List

id_str = ['1', '2', '3', '4', '5']
id_int = [1, 2, 3, 4, 5]

def threaded(fn, pools=10):
    tp = ThreadPoolExecutor(pools)
    def wrapper(*args):
        return tp.map(fn, *args)  # returns Future object
    return wrapper

class testit:
    def __init__(self, some_list: List[str]) -> None:
        self._ids = some_list
        print(self._ids)

    def test_first(self, some_id: List[int]) -> None:
        print(some_id)

class showit(testit):
    def __init__(self, *args):
        super(showit, self).__init__(*args)
    
    @threaded
    def again(self):
        global id_int
        for values in self.test_first(id_int):
            print(values)

a = showit(id_str)
print(a.again())

Error:

  File "test_1.py", line 32, in <module>
    print(a.again())
  File "test_1.py", line 10, in wrapper
    return tp.map(fn, *args)  # returns Future object
  File "/Users/usr/opt/anaconda3/lib/python3.8/concurrent/futures/_base.py", line 600, in map
    fs = [self.submit(fn, *args) for args in zip(*iterables)]
TypeError: 'showit' object is not iterable

Expected output:

1
2
3
4
5

CodePudding user response:

Exception reason:

When you say a.again(), Python creates a method object from the function again defined in your class. It's the behavior of a descriptor.

This means The first parameter of the again is gonna fill with the reference to the newly created showit object.

See the source code of the map method which you called in line tp.map(fn, *args):

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        ...
        ...
        fs = [self.submit(fn, *args) for args in zip(*iterables)]
        ...
        ...

Look at the interested line, it's going to unpack iterables which currently is tuple of single item which is the instance of the showit class and pass it to zip function, zip accepts iterables and your showit instance is not. Too confirm add print statement to wrapper function:

def threaded(fn, pools=10):
    tp = ThreadPoolExecutor(pools)

    def wrapper(*args):
        print(args)
        return tp.map(fn, *args)  # returns Future object

    return wrapper

How to fix it?

Actually what you're doing is a bit vague for me. I think maybe you don't need concurrent.futures package here. If your again method is supposed to run on it's own thread, you can do:

import threading

id_str = ["1", "2", "3", "4", "5"]
id_int = [1, 2, 3, 4, 5]


def threaded(fn):
    def wrapper(self):
        threading.Thread(target=fn, args=(self,)).start()

    return wrapper


class testit:
    def __init__(self, some_list):
        self._ids = some_list

    def test_first(self, some_id: list[int]):
        return some_id


class showit(testit):
    @threaded
    def again(self):
        for values in self.test_first(id_int):
            print(values)


a = showit(id_str)
a.again()

CodePudding user response:

Updated answer!

Based on your comment I believe this code should do, what you want (the problem is, that a method inside a class will always need to be called with self as the first parameter and that needs to be fed to tp.map as well):

from concurrent.futures import ThreadPoolExecutor
from typing import List

id_str = ['1', '2', '3', '4', '5']
id_int = [1, 2, 3, 4, 5]
tp = ThreadPoolExecutor(10)

def threaded(fn):
    def wrapper(self,lst: List[int]):
        return tp.map(fn,[self]*len(lst),lst)
    return wrapper
    
class testit:
    def __init__(self, some_list: List[str]) -> None:
        self._ids = some_list
        print(self._ids)

    @threaded
    def test_first(self,some_id: int) -> int:
        return -some_id

class showit(testit):
    def __init__(self, *args):
        super(showit, self).__init__(*args)
    
    def again(self):
        global id_int
        for values in self.test_first(id_int):
            print(repr(values))

a = showit(id_str)
a.again()

But be aware: the original test_first method is receiving an int as as a parameter and the decorator is making that into a List[int] and takes care, that the original test_first method is called for each element of the list and doing that in parallel threads in the thread pool tp.

You can see that code in action at https://onlinegdb.com/gJxvgniGQ and it outputs:

['1', '2', '3', '4', '5']
-1
-2
-3
-4
-5
  • Related