I would like to pass an instance of PipelineTask
to PipelineTask.add
however when I try I get a NameError
which mentions PipelineTask is not defined.
I believe this is because PipelineTask
is only bound after PipelineTask.__init__()
is called.
class Task(BaseModel, abc.ABC):
id: str
@abc.abstractmethod
async def run(self):
pass
class PipelineTask(Task):
@abc.abstractmethod
async def run(self):
pass
def add(self, task: PipelineTask):
## TODO: how do a pass a instance of self here?
next = self
if self.next == None:
self.next = task
next = self.next
else:
current = self.next
while current != None:
current = current.next
next = current
current.next = next
return self
class Pipeline(BaseModel):
"""
Pipeline execute a sequence of tasks
...
init = PipelineTask(id=0)
pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
pipeline.run()
...
"""
# The pipelines innitial task
init: PipelineTask
async def run(self):
await self.init.run()
has_next = self.init.next != None
next = self.init
while has_next:
next = next.next
await next.run()
has_next = next.next != None
## Adds a task to the end of the pipeline
async def add(self, next: PipelineTask):
"""add Task to the end of the pipeline"""
self.init.add(next)
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
async def test_create_pipeline():
tasks = (
StdoutTask(id=1, next=None)
.add(StdoutTask(id=2, next=None))
.add(StdoutTask(id=3, next=None))
)
pipeline = Pipeline(init=tasks)
await pipeline.run()
Example usage:
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
@pytest.mark.asyncio
async def test_create_pipeline():
tasks = StdoutTask(id=1).add(StdoutTask(id=2)).add(StdoutTask(id=3))
pipeline = Pipeline(init=tasks)
await pipeline.run()
pass
I tried 'un'specifying task
's type, but then task
is missing the next
attributes (e.g. AttributeError: 'XXXX' object has no attributed next
)
def add(self, task):
...
I have also tried modifying task.__class__ = PipelineTask
which adds additional methods but not attributes.
Below is a single file reproducible example
from pydantic import BaseModel
import abc
import asyncio
class Task(BaseModel, abc.ABC):
id: str
@abc.abstractmethod
async def run(self):
pass
class PipelineTask(Task):
@abc.abstractmethod
async def run(self):
pass
def add(self, task: PipelineTask):
## TODO: how do a pass a instance of self here?
next = self
if self.next == None:
self.next = task
next = self.next
else:
current = self.next
while current != None:
current = current.next
next = current
current.next = next
return self
class Pipeline(BaseModel):
"""
Pipeline execute a sequence of tasks
...
init = PipelineTask(id=0)
pipeline = Pipeline(init=PrepareDataPipelineTask(id='prepare'))
pipeline.add(ExecutePipelineTask(id='execute')).add(CollectResultsPipelineTask(id='collect'))
pipeline.run()
...
"""
# The pipelines innitial task
init: PipelineTask
async def run(self):
await self.init.run()
has_next = self.init.next != None
next = self.init
while has_next:
next = next.next
await next.run()
has_next = next.next != None
## Adds a task to the end of the pipeline
async def add(self, next: PipelineTask):
"""add Task to the end of the pipeline"""
self.init.add(next)
class StdoutTask(PipelineTask):
async def run(self):
print(f"[Worker {self.id}] testing...")
async def test_create_pipeline():
tasks = (
StdoutTask(id=1, next=None)
.add(StdoutTask(id=2, next=None))
.add(StdoutTask(id=3, next=None))
)
pipeline = Pipeline(init=tasks)
await pipeline.run()
Solution
Use getattr
def add(self, task: "PipelineTask"):
next = getattr(self, "next", None)
if self.next == None:
self.next = task
next = self.next
else:
current = getattr(self, "next", None)
while current != None:
current = getattr(current, "next", None)
next = getattr(current, "next", None)
current.next = next
return self
CodePudding user response:
The issue is likely where you have if self.next == None:
; just set self.next
inside __init__
or instead refer to it as getattr(self, "next", None)
>> class Test():
... def add(self):
... print(getattr(self, "missing", None))
...
>>> t = Test()
>>> t.add()
None
>>> t.missing = "not missing"
>>> t.add()
not missing