Home > OS >  Keras custom DataGenerator for videos; how to pass the correct outputs to my model?
Keras custom DataGenerator for videos; how to pass the correct outputs to my model?

Time:04-21

I am creating a RNN model to process videos of a certain length (10 frames). Each video is stored as multiple images (of varying lengths) within their individual folders. Before passing the batch of frames to the RNN model however, I am pre-processing the images of each frame using a ResNet feature extractor. I am using a custom data generator to take the paths of the folders with the images, pre-process the images and then pass it to the model.

I have rather clunkily been doing this without a data generator but this is not really practical as I have a training set of >10,000 videos and also later wish to perform data augmentation.

This is the code of my custom data generator

class DataGenerator(keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_IDs, labels, video_paths,
                 batch_size=32, video_length=10, dim=(224,224),
                 n_channels=3, n_classes=4, IMG_SIZE = 224, MAX_SEQ_LENGTH = 10,
                 NUM_FEATURES = 2048, shuffle=True):
        'Initialization'
        
        self.list_IDs = list_IDs
        self.labels = labels
        self.video_paths = video_paths        
        self.batch_size = batch_size
        self.dim = dim
        self.video_length = video_length
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.IMG_SIZE = IMG_SIZE
        self.MAX_SEQ_LENGTH = MAX_SEQ_LENGTH
        self.NUM_FEATURES = NUM_FEATURES
        self.shuffle = shuffle
        self.on_epoch_end()
    
    def crop_center_square(frame):
        y, x = frame.shape[0:2]
        min_dim = min(y, x)
        start_x = (x // 2) - (min_dim // 2)
        start_y = (y // 2) - (min_dim // 2)
        return frame[start_y : start_y   min_dim, start_x : start_x   min_dim]
    
    def load_series(self, videopath):
        frames = []
        image_paths = [os.path.join(videopath, o) for o in os.listdir(videopath)]
        frame_num = np.linspace(0,len(image_paths)-1, num=10)   
        frame_num = frame_num.astype(int)
        resize=(self.IMG_SIZE, self.IMG_SIZE)
        # resize=(IMG_SIZE, IMG_SIZE)
        
        for ix in frame_num:
            image = Image.open(image_paths[ix])
            im_array = np.asarray(image)
            im_array = self.crop_center_square(im_array)
            # im_array = crop_center_square(im_array)
            im_array = cv2.resize(im_array, resize)
            stacked_im_array = np.stack((im_array,)*3, axis=-1)
            frames.append(stacked_im_array)
            # plt.imshow(stacked_im_array)
            # plt.show()
            
        return np.array(frames)
    
    def build_feature_extractor(self):
        feature_extractor = keras.applications.resnet_v2.ResNet152V2(
            weights="imagenet",
            include_top=False,
            pooling="avg",
            input_shape=(self.IMG_SIZE, self.IMG_SIZE, 3),
        )
        preprocess_input = keras.applications.resnet_v2.preprocess_input

        inputs = keras.Input((self.IMG_SIZE, self.IMG_SIZE, 3))
        preprocessed = preprocess_input(inputs)

        outputs = feature_extractor(preprocessed)
        return keras.Model(inputs, outputs, name="feature_extractor")


    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size: (index 1)*self.batch_size]
        
        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]
        
        # Generate data
        [frame_features, frame_masks], frame_labels = self._generate_X(list_IDs_temp)
      
        return [frame_features, frame_masks], frame_labels
    
    def _generate_X(self, list_IDs_temp):
        'Generates data containing batch_size videos'
        # Initialization
        frame_masks = np.zeros(shape=(self.batch_size, self.MAX_SEQ_LENGTH), dtype="bool")
        frame_features = np.zeros(shape=(self.batch_size, self.MAX_SEQ_LENGTH, self.NUM_FEATURES), dtype="float32")
        frame_labels = np.zeros(shape=(self.batch_size), dtype="int")
        feature_extractor = self.build_feature_extractor()
        tt = time.time()
        # frame_masks = np.zeros(shape=(batch_size, MAX_SEQ_LENGTH), dtype="bool")
        # frame_features = np.zeros(shape=(batch_size, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")
        # frame_labels = np.zeros(shape=(batch_size), dtype="int")
        
        for idx, ID in enumerate(list_IDs_temp):
            videopath = self.video_paths[ID]
            # videopath = video_paths[ID]
            video_frame_label = self.labels[ID]
            # Gather all its frames and add a batch dimension.       
            frames = self.load_series(Path(videopath))
            # frames = load_series(Path(videopath))
            
            # At this point frames.shape = (10, 224, 224, 3)
            frames = frames[None, ...]
            # After this, frames.shape = (1, 10, 224, 224, 3)

            # Initialize placeholders to store the masks and features of the current video.
            temp_frame_mask = np.zeros(shape=(1, self.MAX_SEQ_LENGTH,), dtype="bool")
            # temp_frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
            # temp_frame_mask.shape = (1,60)
            
            temp_frame_features = np.zeros(shape=(1, self.MAX_SEQ_LENGTH, self.NUM_FEATURES), dtype="float32")
            # temp_frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")
            # temp_frame_features.shape = (1, 60, 2048)
            
            # Extract features from the frames of the current video.
            for i, batch in enumerate(frames):
                video_length = batch.shape[0]
                length = min(self.MAX_SEQ_LENGTH, video_length)
                # length = min(MAX_SEQ_LENGTH, video_length)
                for j in range(length):
                    temp_frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
                    # temp_frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
                temp_frame_mask[i, :length] = 1  # 1 = not masked, 0 = masked
                
            frame_features[idx,] = temp_frame_features.squeeze()
            frame_masks[idx,] = temp_frame_mask.squeeze()
            frame_labels[idx] = video_frame_label
        tf = time.time() - tt
        print(f'Pre-process length: {tf}')
        
        return [frame_features, frame_masks], frame_labels

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

This is the code for the RNN model

label_processor = keras.layers.StringLookup(num_oov_indices=0, vocabulary=np.unique(train_df["view"]))

print(label_processor.get_vocabulary())

train_list_IDs = train_df.index
train_labels = train_df["view"].values
train_labels = label_processor(train_labels[..., None]).numpy()
train_video_paths = train_df['series']

training_generator = DataGenerator(train_list_IDs, train_labels, train_video_paths)

test_list_IDs = test_df.index
test_labels = test_df["view"].values
test_labels = label_processor(test_labels[..., None]).numpy()
test_video_paths = test_df['series']

testing_generator = DataGenerator(test_list_IDs, test_labels, test_video_paths)

# Utility for our sequence model.
def get_sequence_model():
    class_vocab = label_processor.get_vocabulary()

    frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
    mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")

    # Refer to the following tutorial to understand the significance of using `mask`:
    # https://keras.io/api/layers/recurrent_layers/gru/
    x = keras.layers.GRU(16, return_sequences=True)(
        frame_features_input, mask=mask_input
    )
    x = keras.layers.GRU(8)(x)
    x = keras.layers.Dropout(0.4)(x)
    x = keras.layers.Dense(8, activation="relu")(x)
    output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)
    
    rnn_model = keras.Model([frame_features_input, mask_input], output)

    rnn_model.compile(
        loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
    )
    return rnn_model


# Utility for running experiments.
def run_experiment():
    now = datetime.now()
    current_time = now.strftime("%d_%m_%Y_%H_%M_%S")
    filepath = os.path.join(Path('F:/RNN'), f'RNN_ResNet_Model_{current_time}')
    checkpoint = keras.callbacks.ModelCheckpoint(
        filepath, save_weights_only=True, save_best_only=True, verbose=1
    )

    seq_model = get_sequence_model()
    history = seq_model.fit(training_generator,
        epochs=EPOCHS,
        callbacks=[checkpoint],
    )
    seq_model.load_weights(filepath)
    _, accuracy = seq_model.evaluate(testing_generator)
    print(f"Test accuracy: {round(accuracy * 100, 2)}%")

    return history, accuracy, seq_model


_, accuracy, sequence_model = run_experiment()

I am struggling to figure out how I can pass the results of my custom data generator to my RNN model? How can I best rewrite my code to either work with model.fit() or model.fit_generator()?

Thank you in advance!

CodePudding user response:

Please specify in your questions what exactly it is you're struggling with. Do you expect different results, is your code slow, or do you get errors? Based on your code I see some issues and would suggest the following adjustments:

The __getitem__() function in a DataGenerator is called every time you retrieve a batch of data from your generator. Within that function you call _generate_X() which also initializes, again - at every batch generation, the pretrained ResNet feature extractor through feature_extractor = self.build_feature_extractor(). This is highly inefficient.

As an alternative, I would propose to remove the model creation within your generator class and to rather create the feature extractor in your main notebook and give it as parameter for your DataGenerator instance:

In your main file:

def build_feature_extractor(self): [...]

feature_extractor = build_feature_extractor()

testing_generator = DataGenerator(test_list_IDs, test_labels, test_video_paths, feature_extractor)

For the generator class:

class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, list_IDs, labels, video_paths, feature_extractor,
             batch_size=32, video_length=10, dim=(224,224),
             n_channels=3, n_classes=4, IMG_SIZE = 224, MAX_SEQ_LENGTH = 10,
             NUM_FEATURES = 2048, shuffle=True):
    'Initialization'
    
    self.list_IDs = list_IDs
    [...]
    self.feature_extractor = feature_extractor [...]

and then adjust to this:

temp_frame_features[i, j, :] = self.feature_extractor.predict(batch[None, j, :])

You have correctly used the generator in your .fit call, using model.fit(training_generator, ...) will feed your model the created batches from __getitem__().

CodePudding user response:

The error I was getting was getting was

raise NotImplementedError keras

Rather stupidly, I had forgotten to put the following function within the DataGenerator function

def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))

The error went away after that.

obsolete_hegemony did give me an excellent suggestion to optimise my code and separate the feature extraction pre-processing!

  • Related