Home > Back-end >  How to handle Pytorch Dataset with transform function that returns >1 output per row of data?
How to handle Pytorch Dataset with transform function that returns >1 output per row of data?

Time:09-28

Given a myfile.csv file that looks like:

imagefile,label
train/0/16585.png,0
train/0/56789.png,0

The goal is to create a Pytorch DataLoader that when looped return 2x the data points, e.g.

>>> dp = MyDataPipe(csvfile)
>>> for row in dp.train_dataloader:
...     print(row)
...
(tensor([1.23, 4.56, 7.89]), 0)
(tensor([9.87, 6.54, 3.21]), 1)
(tensor([9.99, 8.88, 7.77]), 0)
(tensor([1.11, 2.22, 9.87]), 1)

I've tried writing the dataloader if we are just expect the same no. of dataloader's row as per the input file, this works:

import torch 

from torch.utils.data import DataLoader2
from torchdata.datapipes.iter import IterDataPipe, IterableWrapper
import pytorch_lightning as pl


content = """imagefile,label
train/0/16585.png,0
train/0/56789.png,0"""

with open('myfile.csv', 'w') as fout:
    fout.write(content)


def optimus_prime(row):
    """This functions returns two data points with some arbitrary vectors.
    >>> row = {'imagefile': 'train/0/16585.png', label: 0}
    >>> optimus_prime(row)
    (tensor([1.23, 4.56, 7.89]), 0)
    """
    # We are using torch.rand here but there is an actual function
    # that converts the png file into a vector.
    vector1 = torch.rand(3) 
    return vector1, row['label']
    

class MyDataPipe(pl.LightningDataModule):
    def __init__(
        self,
        csv_files: list[str],
        skip_lines: int = 0,
        tranform_func: Callable = None
    ):
        super().__init__()
        self.csv_files: list[str] = csv_files
        self.skip_lines: int = skip_lines

        # Initialize a datapipe.
        self.dp_chained_datapipe: IterDataPipe = (
            IterableWrapper(iterable=self.csv_files)
            .open_files()
            .parse_csv_as_dict(skip_lines=self.skip_lines)
        )
            
        if tranform_func:
            self.dp_chained_datapipe = self.dp_chained_datapipe.map(tranform_func)

    def train_dataloader(self, batch_size=1) -> DataLoader2:
        return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)

dp = MyDataPipe('myfile.csv', tranform_func=optimus_prime)

for row in dp.train_dataloader:
    print(row)

If the optimus_prime function returns 2 data points, how do I setup the Dataloader such that it can collate the 2 data points accordingly?

How to formulate the collate function or tell the Dataloader that there's 2 inputs in each .map(tranform_func) output? E.g. if I change the function to:

def optimus_prime(row):
    """This functions returns two data points with some arbitrary vectors.
    >>> row = {'imagefile': 'train/0/16585.png', label: 0}
    >>> optimus_prime(row)
    (tensor([1.23, 4.56, 7.89]), 0), (tensor([3.21, 6.54, 9.87]), 1)
    """
    # We are using torch.rand here but there is an actual function
    # that converts the png file into a vector.
    vector1 = torch.rand(3) 
    yield vector1, row['label']
    yield vector2, not row['label']

I've also tried the following and it works but I need to run the optimus_prime function twice, but the 2nd .map(tranform_func) throws a TypeError: tuple indices must be integers or slice not str...


def optimus_prime_1(row):
    # We are using torch.rand here but there is an actual function
    # that converts the png file into a vector.
    vector1 = torch.rand(3) 
    yield vector1, row['label']

def optimus_prime_2(row):
    # We are using torch.rand here but there is an actual function
    # that converts the png file into a vector.
    vector2 = torch.rand(3) 
    yield vector2, not row['label']
    

class MyDataPipe(pl.LightningDataModule):
    def __init__(
        self,
        csv_files: list[str],
        skip_lines: int = 0,
        tranform_funcs: list[Callable] = None
    ):
        super().__init__()
        self.csv_files: list[str] = csv_files
        self.skip_lines: int = skip_lines

        # Initialize a datapipe.
        self.dp_chained_datapipe: IterDataPipe = (
            IterableWrapper(iterable=self.csv_files)
            .open_files()
            .parse_csv_as_dict(skip_lines=self.skip_lines)
        )
            
        if tranform_funcs:
            for tranform_func in tranform_funcs:
                self.dp_chained_datapipe = self.dp_chained_datapipe.map(tranform_func)

    def train_dataloader(self, batch_size=1) -> DataLoader2:
        return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)

dp = MyDataPipe('myfile.csv', tranform_funcs=[optimus_prime_1, optimus_prime_2])

for row in dp.train_dataloader:
    print(row)

CodePudding user response:

Couldn't you just

import pandas as pd
from torch.utils.data import Dataset
from PIL import Image

class Data(Dataset):
    def __init__(self, csv, transform):
        self.csv = pd.read_csv(csv)
        self.transform = transform

    def __len__(self):
        return len(self.csv)

    def __getitem__(self, idx):
        row = self.csv.iloc[idx]
        x = self.transform(Image.open(row['imagefile']))
        y = torch.tensor(row['label'])
        return x, y

And simply use torch.utils.data.DataLoader? Like

dataset = Data('myfile.csv', optimus)
dataloader = DataLoader(Data, batch_size=batch_size)

Edit 3

class Optimus(IterDataPipe):
    def __init__(self, csv, transform1, transform2):
        self.csv = pd.read_csv(csv)
        self.transform1 = transform1
        self.transform2 = transform2

    def __len__(self):
        return len(self.csv) * 2

    def __getitem__(self, idx):
        row = self.csv.iloc[idx // 2]
        # x = self.transform(Image.open(row['imagefile']))
        x = row['imagefile']
        y = torch.tensor(row['label'])
        if idx % 2 == 1:
            y = 1 - y
        return x, y

    def __iter__(self):
        for i in range(len(self)):
            yield self[i]

class MyDataPipe(pl.LightningDataModule):
    def __init__(
        self,
        csv_files,
        skip_lines = 0,
        tranform_func = None
    ):
        super().__init__()
        self.csv_files: list[str] = csv_files
        self.skip_lines: int = skip_lines

        # Initialize a datapipe.
        self.dp_chained_datapipe: IterDataPipe = Optimus('myfile.csv', transform1=None, transform2=None)

        if tranform_func:
            self.dp_chained_datapipe = self.dp_chained_datapipe.map(tranform_func)

    def train_dataloader(self, batch_size=1) -> DataLoader2:
        return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)
>>> dp = MyDataPipe('myfile.csv', )
>>> for row in dp.train_dataloader():
...     print(row)
...
[('train/0/16585.png',), tensor([0])]
[('train/0/16585.png',), tensor([1])]
[('train/0/56789.png',), tensor([0])]
[('train/0/56789.png',), tensor([1])]

CodePudding user response:

From https://discuss.pytorch.org/t/how-to-handle-pytorch-dataset-with-transform-function-that-returns-1-output-per-row-of-data/162160, there's a reference to use .flatmap() instead of .map():

By changing the transformation function to return N no. of data points per row of data form the csvfile, e.g.

def optimus_prime(row):
    """This functions returns two data points with some arbitrary vectors.
    >>> row = {'imagefile': 'train/0/16585.png', label: 0}
    >>> optimus_prime(row)
    (tensor([1.23, 4.56, 7.89]), 0)
    """
    # We are using torch.rand here but there is an actual function
    # that converts the png file into a vector.
    vector1 = torch.rand(3) 
    vector2 = torch.rand(3) 
    return [(vector1, row['label']), (vector2, row['label'])] 
    

Changing the code to use the .flatmap() as such works:


class MyDataPipe(pl.LightningDataModule):
    def __init__(
        self,
        csv_files,
        skip_lines=0
    ):
        super().__init__()
        self.csv_files: list[str] = csv_files
        self.skip_lines: int = skip_lines

        # Initialize a datapipe.
        self.dp_chained_datapipe: IterDataPipe = (
            IterableWrapper(iterable=self.csv_files)
            .open_files()
            .parse_csv_as_dict(skip_lines=self.skip_lines)
        )
            
        self.dp_chained_datapipe = self.dp_chained_datapipe.flatmap(optimus_prime)

    def train_dataloader(self, batch_size=1) -> DataLoader2:
        return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)

Full working example:

import torch 

from torch.utils.data import DataLoader2
import pytorch_lightning as pl
from torchdata.datapipes.iter import IterDataPipe, IterableWrapper

content = """imagefile,label
train/0/16585.png,0
train/0/56789.png,0"""

with open('myfile.csv', 'w') as fout:
    fout.write(content)

def optimus_prime(row):
    """This functions returns two data points with some arbitrary vectors.
    >>> row = {'imagefile': 'train/0/16585.png', label: 0}
    >>> optimus_prime(row)
    (tensor([1.23, 4.56, 7.89]), 0)
    """
    # We are using torch.rand here but there is an actual function
    # that converts the png file into a vector.
    vector1 = torch.rand(3) 
    vector2 = torch.rand(3) 
    return [(vector1, row['label']), (vector2, row['label'])] 
    

class MyDataPipe(pl.LightningDataModule):
    def __init__(
        self,
        csv_files,
        skip_lines=0
    ):
        super().__init__()
        self.csv_files: list[str] = csv_files
        self.skip_lines: int = skip_lines

        # Initialize a datapipe.
        self.dp_chained_datapipe: IterDataPipe = (
            IterableWrapper(iterable=self.csv_files)
            .open_files()
            .parse_csv_as_dict(skip_lines=self.skip_lines)
        )
            
        self.dp_chained_datapipe = self.dp_chained_datapipe.flatmap(optimus_prime)

    def train_dataloader(self, batch_size=1) -> DataLoader2:
        return DataLoader2(dataset=self.dp_chained_datapipe, batch_size=batch_size)

dp = MyDataPipe(['myfile.csv'])

for row in dp.train_dataloader():
    print(row)

[out]:

[tensor([[0.6003, 0.1200, 0.5175]]), ('0',)]
[tensor([[0.0628, 0.7004, 0.3169]]), ('0',)]
[tensor([[0.0623, 0.4608, 0.7456]]), ('0',)]
[tensor([[0.7454, 0.5326, 0.7459]]), ('0',)]
  • Related