Pytorch Datapipes are a new inplace dataset loaders for large data that can be fed into Pytorch models through streaming, for reference these are
- Official Doc: https://pytorch.org/data/main/tutorial.html
- A crash-course post explaining the usage https://sebastianraschka.com/blog/2022/datapipes.html
Given a myfile.csv
file, initialised as csv_file
variable in code, the file looks like this, :
imagefile,label
train/0/16585.png,0
train/0/56789.png,0
...
In the example code, that uses datapipes that reads a csv_file
and then create a iterable dataset using torchdata.datapipes
and we see something like:
from torchdata import datapipes as dp
def build_data_pipe(csv_file, transform, len=1000, batch_size=32):
new_dp = dp.iter.FileOpener([csv_file])
new_dp = new_dp.parse_csv(skip_lines=1)
# returns tuples like ('train/0/16585.png', '0')
new_dp = new_dp.shuffle(buffer_size=len)
...
# More code that returns `new_dp` variable that looks like some
# lazy-loaded unevaluated/materialized Iterable objects.
return new_dp
If we look at each step and the return to new_dp
, we see:
>>> from torchdata import datapipes as dp
# The first initialize a FileOpenerIterDataPipe type
>>> new_dp = dp.iter.FileOpener(["myfile.csv"])
>>> new_dp
FileOpenerIterDataPipe
# Then after that the API to the DataPipes allows some overwriting/subclassing
# by calling a partial function, e.g.
>>> new_dp.parse_csv
functools.partial(<function IterDataPipe.register_datapipe_as_function.<locals>.class_function at 0x213123>, <class 'torchdata.datapipes.iter.util.plain_text_reader.CSVParserIterDataPipe'>, False, FileOpenerIterDataPipe)
>>> new_dp = new_dp.parse_csv(skip_lines=1)
>>> new_dp
CSVParserIterDataPipe
It looks like the new_dp.parse_csv(skip_lines=1)
is trying do a a new initialization through a MixIn between CSVParserIterDataPipe
and FileOpenerIterDataPipe
but I'm not exactly sure what's happening.
To fully get a working datapipe, there's a whole bunch of other new_dp = new_dp.xxx()
to call. And my question are,
Q1. Can't the DataPipe be initialize in a non-sequetial way? (P/S: This didn't work as expected)
from torchdata imnport datapipes as dp
class MyDataPipe(dp.iterGenericDataPipe):
def __init__(self, csv_file, skip_lines=1, shuffle_buffer=1000):
super().__init__([csv_file])
self.parse_csv(skip_lines=1)
self.new_dp.shuffle(buffer_size=shuffle_buffer)
But given that we have to overwrite the new_dp
, seems like we might have to do something like:
from torchdata imnport datapipes as dp
class MyDataPipe(dp.iterGenericDataPipe):
def __init__(self, csv_file, skip_lines=1, shuffle_buffer=1000):
super().__init__([csv_file])
self = self.parse_csv(skip_lines=1)
self = self.new_dp.shuffle(buffer_size=shuffle_buffer)
Q2. Is self = self.xxx()
an anti-pattern in Python?
Q3. How else to initialize a DataPipe if we don't do self = self.xxx()
?
CodePudding user response:
It looks like you're trying to chain together a series of torch DataPipe
s, namely:
- FileOpener /
open_files
- CSVParser /
parse_csv
- Shuffler /
shuffle
The official torchdata tutorial at https://pytorch.org/data/0.4/tutorial.html does so using a function (e.g. def custom_data_pipe()
), but you seem to prefer a class-based approach (e.g. class CustomDataPipe
). Let's call this a DataPipeLine.
An additional complication is that you're trying to apply an inheritance-style torch.utils.data.Dataset
to a composition-style torchdata.datapipes.iter.IterDataPipe
. Presumably, the reason you're doing this is to create a configurable 'dataset', e.g. one that can skip N lines, has a shuffle buffer of B, etc. Now there's a few things wrong about this, but let's go with it.
Bad example (please don't use)
from torchdata.datapipes import functional_datapipe
from torchdata.datapipes.iter import IterDataPipe, IterableWrapper
@functional_datapipe("csv_processor_and_batcher")
class MyDataPipeLine(IterDataPipe):
def __init__(
self,
source_datapipe: IterDataPipe[str],
skip_lines: int = 1,
shuffle_buffer: int = 1000,
):
super().__init__()
self.source_datapipe: IterDataPipe[str] = source_datapipe
self.chained_datapipe = (
self.source_datapipe.open_files()
.parse_csv(skip_lines=1)
.shuffle(buffer_size=shuffle_buffer)
)
def __iter__(self):
for item in self.chained_datapipe:
yield item
And the way you would use it is:
dp = IterableWrapper(iterable=["file1.csv", "file2.csv"])
dp_custom = dp.csv_processor_and_batcher()
dataloader = torch.utils.data.DataLoader(dataset=dp_custom)
for batch in dataloader:
print(batch)
Now to be honest, this is really not recommended (and I'm half regretting writing up this answer already) because the reason torchdata
exists is to have compositional DataPipes, i.e. each DataPipe should be specialized to do one thing only rather than many things. What you probably want is to 1) Read up more on composition and pipe-ing:
- https://realpython.com/inheritance-composition-python
- https://pandas.pydata.org/pandas-docs/version/1.5/reference/api/pandas.DataFrame.pipe.html
Then 2) write something like the below. I'm using a LightningDataModule
not only because it's cool, but because it's closer to the thing you actually want to subclass:
Better example
from typing import Optional
from torch.utils.data import DataLoader2
from torchdata.datapipes.iter import IterDataPipe, IterableWrapper
import pytorch_lightning as pl
class MyDataPipeModule(pl.LightningDataModule):
def __init__(
self,
csv_files: list[str],
skip_lines: int = 1,
shuffle_buffer: int = 1000,
):
super().__init__()
self.csv_files: list[str] = csv_files
self.skip_lines: int = skip_lines
self.shuffle_buffer: int = shuffle_buffer
# Run the datapipe composition setup()
self.setup()
def setup(self, stage: Optional[str] = None) -> IterDataPipe:
self.dp_chained_datapipe: IterDataPipe = (
IterableWrapper(iterable=self.csv_files)
.open_files()
.parse_csv(skip_lines=1)
.shuffle(buffer_size=self.shuffle_buffer)
)
return self.dp_chained_datapipe
def train_dataloader(self) -> DataLoader2:
return DataLoader2(dataset=self.dp_chained_datapipe)
Usage:
datamodule = MyDataPipeModule(csv_files=["file1.csv", "file2.csv"])
model: pl.LightningModule = MyModel()
trainer = pl.Trainer(accelerator="auto", max_epochs=3)
trainer.fit(model=model, datamodule=datamodule)
Maybe not quite the answer you expected, but I'd encourage you to experiment a bit more. The key bit is to switch your mindset from inheritance (subclassing) to composition (chaining/pipe-ing).
P.S. Gonna throw in a shameless plug on some tutorials I wrote at https://zen3geo.readthedocs.io/en/v0.4.0/walkthrough.html. It's a bit geospatial specific, but might be helpful to get a feel of the DataPipe-way of working. Good luck!