Given a myfile.csv
file that looks like:
imagefile,label
train/0/16585.png,0
train/0/56789.png,0
The goal is to create a Pytorch DataLoader
that when looped return 2x the data points, e.g.
>>> dp = MyDataPipe(csvfile)
>>> for row in dp.train_dataloader:
... print(row)
...
(tensor([1.23, 4.56, 7.89]), 0)
(tensor([9.87, 6.54, 3.21]), 1)
(tensor([9.99, 8.88, 7.77]), 0)
(tensor([1.11, 2.22, 9.87]), 1)
I've tried writing the dataloader if we are just expect the same no. of dataloader's row as per the input file, this works:
import torch
from torch.utils.data import DataLoader2
from torchdata.datapipes.iter import IterDataPipe, IterableWrapper
import pytorch_lightning as pl
content = """imagefile,label
train/0/16585.png,0
train/0/56789.png,0"""
with open('myfile.csv', 'w') as fout:
fout.write(content)
def optimus_prime(row):
"""This functions returns two data points with some arbitrary vectors.
>>> row = {'imagefile': 'train/0/16585.png', label: 0}
>>> optimus_prime(row)
(tensor([1.23, 4.56, 7.89]), 0)
"""
# We are using torch.rand here but there is an actual function
# that converts the png file into a vector.
vector1 = torch.rand(3)
return vector1, row['label']
class MyDataPipe(pl.LightningDataModule):
def __init__(
self,
csv_files: list[str],
skip_lines: int = 0,
tranform_func: Callable = None
):
super().__init__()
self.csv_files: list[str] = csv_files
self.skip_lines: int = skip_lines
# Initialize a datapipe.
self.dp_chained_datapipe: IterDataPipe = (
IterableWrapper(iterable=self.csv_files)
.open_files()
.parse_csv_as_dict(skip_lines=self.skip_lines)
)
if tranform_func:
self.dp_chained_datapipe = self.dp_chained_datapipe.map(tranform_func)
def train_dataloader(self, batch_size=1) -> DataLoader2:
return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)
dp = MyDataPipe('myfile.csv', tranform_func=optimus_prime)
for row in dp.train_dataloader:
print(row)
If the optimus_prime
function returns 2 data points, how do I setup the Dataloader such that it can collate the 2 data points accordingly?
How to formulate the collate function or tell the Dataloader that there's 2 inputs in each .map(tranform_func)
output? E.g. if I change the function to:
def optimus_prime(row):
"""This functions returns two data points with some arbitrary vectors.
>>> row = {'imagefile': 'train/0/16585.png', label: 0}
>>> optimus_prime(row)
(tensor([1.23, 4.56, 7.89]), 0), (tensor([3.21, 6.54, 9.87]), 1)
"""
# We are using torch.rand here but there is an actual function
# that converts the png file into a vector.
vector1 = torch.rand(3)
yield vector1, row['label']
yield vector2, not row['label']
I've also tried the following and it works but I need to run the optimus_prime
function twice, but the 2nd .map(tranform_func)
throws a TypeError: tuple indices must be integers or slice not str
...
def optimus_prime_1(row):
# We are using torch.rand here but there is an actual function
# that converts the png file into a vector.
vector1 = torch.rand(3)
yield vector1, row['label']
def optimus_prime_2(row):
# We are using torch.rand here but there is an actual function
# that converts the png file into a vector.
vector2 = torch.rand(3)
yield vector2, not row['label']
class MyDataPipe(pl.LightningDataModule):
def __init__(
self,
csv_files: list[str],
skip_lines: int = 0,
tranform_funcs: list[Callable] = None
):
super().__init__()
self.csv_files: list[str] = csv_files
self.skip_lines: int = skip_lines
# Initialize a datapipe.
self.dp_chained_datapipe: IterDataPipe = (
IterableWrapper(iterable=self.csv_files)
.open_files()
.parse_csv_as_dict(skip_lines=self.skip_lines)
)
if tranform_funcs:
for tranform_func in tranform_funcs:
self.dp_chained_datapipe = self.dp_chained_datapipe.map(tranform_func)
def train_dataloader(self, batch_size=1) -> DataLoader2:
return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)
dp = MyDataPipe('myfile.csv', tranform_funcs=[optimus_prime_1, optimus_prime_2])
for row in dp.train_dataloader:
print(row)
CodePudding user response:
Couldn't you just
import pandas as pd
from torch.utils.data import Dataset
from PIL import Image
class Data(Dataset):
def __init__(self, csv, transform):
self.csv = pd.read_csv(csv)
self.transform = transform
def __len__(self):
return len(self.csv)
def __getitem__(self, idx):
row = self.csv.iloc[idx]
x = self.transform(Image.open(row['imagefile']))
y = torch.tensor(row['label'])
return x, y
And simply use torch.utils.data.DataLoader
? Like
dataset = Data('myfile.csv', optimus)
dataloader = DataLoader(Data, batch_size=batch_size)
Edit 3
class Optimus(IterDataPipe):
def __init__(self, csv, transform1, transform2):
self.csv = pd.read_csv(csv)
self.transform1 = transform1
self.transform2 = transform2
def __len__(self):
return len(self.csv) * 2
def __getitem__(self, idx):
row = self.csv.iloc[idx // 2]
# x = self.transform(Image.open(row['imagefile']))
x = row['imagefile']
y = torch.tensor(row['label'])
if idx % 2 == 1:
y = 1 - y
return x, y
def __iter__(self):
for i in range(len(self)):
yield self[i]
class MyDataPipe(pl.LightningDataModule):
def __init__(
self,
csv_files,
skip_lines = 0,
tranform_func = None
):
super().__init__()
self.csv_files: list[str] = csv_files
self.skip_lines: int = skip_lines
# Initialize a datapipe.
self.dp_chained_datapipe: IterDataPipe = Optimus('myfile.csv', transform1=None, transform2=None)
if tranform_func:
self.dp_chained_datapipe = self.dp_chained_datapipe.map(tranform_func)
def train_dataloader(self, batch_size=1) -> DataLoader2:
return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)
>>> dp = MyDataPipe('myfile.csv', )
>>> for row in dp.train_dataloader():
... print(row)
...
[('train/0/16585.png',), tensor([0])]
[('train/0/16585.png',), tensor([1])]
[('train/0/56789.png',), tensor([0])]
[('train/0/56789.png',), tensor([1])]
CodePudding user response:
From https://discuss.pytorch.org/t/how-to-handle-pytorch-dataset-with-transform-function-that-returns-1-output-per-row-of-data/162160, there's a reference to use .flatmap()
instead of .map()
:
https://pytorch.org/data/main/generated/torchdata.datapipes.iter.FlatMapper.html
https://pytorch.org/data/main/generated/torchdata.datapipes.iter.Mapper.html
By changing the transformation function to return N no. of data points per row of data form the csvfile, e.g.
def optimus_prime(row):
"""This functions returns two data points with some arbitrary vectors.
>>> row = {'imagefile': 'train/0/16585.png', label: 0}
>>> optimus_prime(row)
(tensor([1.23, 4.56, 7.89]), 0)
"""
# We are using torch.rand here but there is an actual function
# that converts the png file into a vector.
vector1 = torch.rand(3)
vector2 = torch.rand(3)
return [(vector1, row['label']), (vector2, row['label'])]
Changing the code to use the .flatmap()
as such works:
class MyDataPipe(pl.LightningDataModule):
def __init__(
self,
csv_files,
skip_lines=0
):
super().__init__()
self.csv_files: list[str] = csv_files
self.skip_lines: int = skip_lines
# Initialize a datapipe.
self.dp_chained_datapipe: IterDataPipe = (
IterableWrapper(iterable=self.csv_files)
.open_files()
.parse_csv_as_dict(skip_lines=self.skip_lines)
)
self.dp_chained_datapipe = self.dp_chained_datapipe.flatmap(optimus_prime)
def train_dataloader(self, batch_size=1) -> DataLoader2:
return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)
Full working example:
import torch
from torch.utils.data import DataLoader2
import pytorch_lightning as pl
from torchdata.datapipes.iter import IterDataPipe, IterableWrapper
content = """imagefile,label
train/0/16585.png,0
train/0/56789.png,0"""
with open('myfile.csv', 'w') as fout:
fout.write(content)
def optimus_prime(row):
"""This functions returns two data points with some arbitrary vectors.
>>> row = {'imagefile': 'train/0/16585.png', label: 0}
>>> optimus_prime(row)
(tensor([1.23, 4.56, 7.89]), 0)
"""
# We are using torch.rand here but there is an actual function
# that converts the png file into a vector.
vector1 = torch.rand(3)
vector2 = torch.rand(3)
return [(vector1, row['label']), (vector2, row['label'])]
class MyDataPipe(pl.LightningDataModule):
def __init__(
self,
csv_files,
skip_lines=0
):
super().__init__()
self.csv_files: list[str] = csv_files
self.skip_lines: int = skip_lines
# Initialize a datapipe.
self.dp_chained_datapipe: IterDataPipe = (
IterableWrapper(iterable=self.csv_files)
.open_files()
.parse_csv_as_dict(skip_lines=self.skip_lines)
)
self.dp_chained_datapipe = self.dp_chained_datapipe.flatmap(optimus_prime)
def train_dataloader(self, batch_size=1) -> DataLoader2:
return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)
dp = MyDataPipe(['myfile.csv'])
for row in dp.train_dataloader():
print(row)
[out]:
[tensor([[0.6003, 0.1200, 0.5175]]), ('0',)]
[tensor([[0.0628, 0.7004, 0.3169]]), ('0',)]
[tensor([[0.0623, 0.4608, 0.7456]]), ('0',)]
[tensor([[0.7454, 0.5326, 0.7459]]), ('0',)]