Home > Back-end >  Passing instance of type self to method as argument
Passing instance of type self to method as argument

Time:11-28

I would like to pass an instance of PipelineTask to PipelineTask.add however when I try I get a NameError which mentions PipelineTask is not defined.

I believe this is because PipelineTask is only bound after PipelineTask.__init__() is called.

class Task(BaseModel, abc.ABC):
    id: str

    @abc.abstractmethod
    async def run(self):
        pass


class PipelineTask(Task):
    @abc.abstractmethod
    async def run(self):
        pass

    def add(self, task: PipelineTask):
        ## TODO: how do a pass a instance of self here?
        next = self
        if self.next == None:
            self.next = task
            next = self.next
        else:
            current = self.next
            while current != None:
                current = current.next
            next = current
            current.next = next
        return self

class Pipeline(BaseModel):
    """
    Pipeline execute a sequence of tasks

    ...
    init = PipelineTask(id=0)
    pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
    pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
    pipeline.run()
    ...
    """

    # The pipelines innitial task
    init: PipelineTask

    async def run(self):
        await self.init.run()
        has_next = self.init.next != None
        next = self.init
        while has_next:
            next = next.next
            await next.run()
            has_next = next.next != None

    ## Adds a task to the end of the pipeline
    async def add(self, next: PipelineTask):
        """add Task to the end of the pipeline"""
        self.init.add(next)


class StdoutTask(PipelineTask):
    async def run(self):
        print(f"[Worker {self.id}] testing...")


async def test_create_pipeline():
    tasks = (
        StdoutTask(id=1, next=None)
        .add(StdoutTask(id=2, next=None))
        .add(StdoutTask(id=3, next=None))
    )
    pipeline = Pipeline(init=tasks)
    await pipeline.run()

Example usage:

class StdoutTask(PipelineTask):
    async def run(self):
        print(f"[Worker {self.id}] testing...")

@pytest.mark.asyncio
async def test_create_pipeline():
    tasks = StdoutTask(id=1).add(StdoutTask(id=2)).add(StdoutTask(id=3))
    pipeline = Pipeline(init=tasks)
    await pipeline.run()
    pass

I tried 'un'specifying task's type, but then task is missing the next attributes (e.g. AttributeError: 'XXXX' object has no attributed next)

def add(self, task):
    ...

I have also tried modifying task.__class__ = PipelineTask which adds additional methods but not attributes.

Below is a single file reproducible example

from pydantic import BaseModel
import abc
import asyncio


class Task(BaseModel, abc.ABC):
    id: str

    @abc.abstractmethod
    async def run(self):
        pass


class PipelineTask(Task):
    @abc.abstractmethod
    async def run(self):
        pass

    def add(self, task: PipelineTask):
        ## TODO: how do a pass a instance of self here?
        next = self
        if self.next == None:
            self.next = task
            next = self.next
        else:
            current = self.next
            while current != None:
                current = current.next
            next = current
            current.next = next
        return self


class Pipeline(BaseModel):
    """
    Pipeline execute a sequence of tasks

    ...
    init = PipelineTask(id=0)
    pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
    pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
    pipeline.run()
    ...
    """

    # The pipelines innitial task
    init: PipelineTask

    async def run(self):
        await self.init.run()
        has_next = self.init.next != None
        next = self.init
        while has_next:
            next = next.next
            await next.run()
            has_next = next.next != None

    ## Adds a task to the end of the pipeline
    async def add(self, next: PipelineTask):
        """add Task to the end of the pipeline"""
        self.init.add(next)


class StdoutTask(PipelineTask):
    async def run(self):
        print(f"[Worker {self.id}] testing...")


async def test_create_pipeline():
    tasks = (
        StdoutTask(id=1, next=None)
        .add(StdoutTask(id=2, next=None))
        .add(StdoutTask(id=3, next=None))
    )
    pipeline = Pipeline(init=tasks)
    await pipeline.run()

Solution

Use getattr

      def add(self, task: "PipelineTask"):
          next = getattr(self, "next", None)
          if self.next == None:
              self.next = task
              next = self.next
          else:
              current = getattr(self, "next", None)
              while current != None:
                  current = getattr(current, "next", None)
              next = getattr(current, "next", None)
              current.next = next
          return self

CodePudding user response:

The issue is likely where you have if self.next == None:; just set self.next inside __init__ or instead refer to it as getattr(self, "next", None)

>> class Test():
...     def add(self):
...         print(getattr(self, "missing", None))
... 
>>> t = Test()
>>> t.add()
None
>>> t.missing = "not missing"
>>> t.add()
not missing
  • Related