I am creating a RNN model to process videos of a certain length (10 frames). Each video is stored as multiple images (of varying lengths) within their individual folders. Before passing the batch of frames to the RNN model however, I am pre-processing the images of each frame using a ResNet feature extractor. I am using a custom data generator to take the paths of the folders with the images, pre-process the images and then pass it to the model.
I have rather clunkily been doing this without a data generator but this is not really practical as I have a training set of >10,000 videos and also later wish to perform data augmentation.
This is the code of my custom data generator
class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, list_IDs, labels, video_paths,
batch_size=32, video_length=10, dim=(224,224),
n_channels=3, n_classes=4, IMG_SIZE = 224, MAX_SEQ_LENGTH = 10,
NUM_FEATURES = 2048, shuffle=True):
'Initialization'
self.list_IDs = list_IDs
self.labels = labels
self.video_paths = video_paths
self.batch_size = batch_size
self.dim = dim
self.video_length = video_length
self.n_channels = n_channels
self.n_classes = n_classes
self.IMG_SIZE = IMG_SIZE
self.MAX_SEQ_LENGTH = MAX_SEQ_LENGTH
self.NUM_FEATURES = NUM_FEATURES
self.shuffle = shuffle
self.on_epoch_end()
def crop_center_square(frame):
y, x = frame.shape[0:2]
min_dim = min(y, x)
start_x = (x // 2) - (min_dim // 2)
start_y = (y // 2) - (min_dim // 2)
return frame[start_y : start_y min_dim, start_x : start_x min_dim]
def load_series(self, videopath):
frames = []
image_paths = [os.path.join(videopath, o) for o in os.listdir(videopath)]
frame_num = np.linspace(0,len(image_paths)-1, num=10)
frame_num = frame_num.astype(int)
resize=(self.IMG_SIZE, self.IMG_SIZE)
# resize=(IMG_SIZE, IMG_SIZE)
for ix in frame_num:
image = Image.open(image_paths[ix])
im_array = np.asarray(image)
im_array = self.crop_center_square(im_array)
# im_array = crop_center_square(im_array)
im_array = cv2.resize(im_array, resize)
stacked_im_array = np.stack((im_array,)*3, axis=-1)
frames.append(stacked_im_array)
# plt.imshow(stacked_im_array)
# plt.show()
return np.array(frames)
def build_feature_extractor(self):
feature_extractor = keras.applications.resnet_v2.ResNet152V2(
weights="imagenet",
include_top=False,
pooling="avg",
input_shape=(self.IMG_SIZE, self.IMG_SIZE, 3),
)
preprocess_input = keras.applications.resnet_v2.preprocess_input
inputs = keras.Input((self.IMG_SIZE, self.IMG_SIZE, 3))
preprocessed = preprocess_input(inputs)
outputs = feature_extractor(preprocessed)
return keras.Model(inputs, outputs, name="feature_extractor")
def __getitem__(self, index):
'Generate one batch of data'
# Generate indexes of the batch
indexes = self.indexes[index*self.batch_size: (index 1)*self.batch_size]
# Find list of IDs
list_IDs_temp = [self.list_IDs[k] for k in indexes]
# Generate data
[frame_features, frame_masks], frame_labels = self._generate_X(list_IDs_temp)
return [frame_features, frame_masks], frame_labels
def _generate_X(self, list_IDs_temp):
'Generates data containing batch_size videos'
# Initialization
frame_masks = np.zeros(shape=(self.batch_size, self.MAX_SEQ_LENGTH), dtype="bool")
frame_features = np.zeros(shape=(self.batch_size, self.MAX_SEQ_LENGTH, self.NUM_FEATURES), dtype="float32")
frame_labels = np.zeros(shape=(self.batch_size), dtype="int")
feature_extractor = self.build_feature_extractor()
tt = time.time()
# frame_masks = np.zeros(shape=(batch_size, MAX_SEQ_LENGTH), dtype="bool")
# frame_features = np.zeros(shape=(batch_size, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")
# frame_labels = np.zeros(shape=(batch_size), dtype="int")
for idx, ID in enumerate(list_IDs_temp):
videopath = self.video_paths[ID]
# videopath = video_paths[ID]
video_frame_label = self.labels[ID]
# Gather all its frames and add a batch dimension.
frames = self.load_series(Path(videopath))
# frames = load_series(Path(videopath))
# At this point frames.shape = (10, 224, 224, 3)
frames = frames[None, ...]
# After this, frames.shape = (1, 10, 224, 224, 3)
# Initialize placeholders to store the masks and features of the current video.
temp_frame_mask = np.zeros(shape=(1, self.MAX_SEQ_LENGTH,), dtype="bool")
# temp_frame_mask = np.zeros(shape=(1, MAX_SEQ_LENGTH,), dtype="bool")
# temp_frame_mask.shape = (1,60)
temp_frame_features = np.zeros(shape=(1, self.MAX_SEQ_LENGTH, self.NUM_FEATURES), dtype="float32")
# temp_frame_features = np.zeros(shape=(1, MAX_SEQ_LENGTH, NUM_FEATURES), dtype="float32")
# temp_frame_features.shape = (1, 60, 2048)
# Extract features from the frames of the current video.
for i, batch in enumerate(frames):
video_length = batch.shape[0]
length = min(self.MAX_SEQ_LENGTH, video_length)
# length = min(MAX_SEQ_LENGTH, video_length)
for j in range(length):
temp_frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
# temp_frame_features[i, j, :] = feature_extractor.predict(batch[None, j, :])
temp_frame_mask[i, :length] = 1 # 1 = not masked, 0 = masked
frame_features[idx,] = temp_frame_features.squeeze()
frame_masks[idx,] = temp_frame_mask.squeeze()
frame_labels[idx] = video_frame_label
tf = time.time() - tt
print(f'Pre-process length: {tf}')
return [frame_features, frame_masks], frame_labels
def on_epoch_end(self):
'Updates indexes after each epoch'
self.indexes = np.arange(len(self.list_IDs))
if self.shuffle == True:
np.random.shuffle(self.indexes)
This is the code for the RNN model
label_processor = keras.layers.StringLookup(num_oov_indices=0, vocabulary=np.unique(train_df["view"]))
print(label_processor.get_vocabulary())
train_list_IDs = train_df.index
train_labels = train_df["view"].values
train_labels = label_processor(train_labels[..., None]).numpy()
train_video_paths = train_df['series']
training_generator = DataGenerator(train_list_IDs, train_labels, train_video_paths)
test_list_IDs = test_df.index
test_labels = test_df["view"].values
test_labels = label_processor(test_labels[..., None]).numpy()
test_video_paths = test_df['series']
testing_generator = DataGenerator(test_list_IDs, test_labels, test_video_paths)
# Utility for our sequence model.
def get_sequence_model():
class_vocab = label_processor.get_vocabulary()
frame_features_input = keras.Input((MAX_SEQ_LENGTH, NUM_FEATURES))
mask_input = keras.Input((MAX_SEQ_LENGTH,), dtype="bool")
# Refer to the following tutorial to understand the significance of using `mask`:
# https://keras.io/api/layers/recurrent_layers/gru/
x = keras.layers.GRU(16, return_sequences=True)(
frame_features_input, mask=mask_input
)
x = keras.layers.GRU(8)(x)
x = keras.layers.Dropout(0.4)(x)
x = keras.layers.Dense(8, activation="relu")(x)
output = keras.layers.Dense(len(class_vocab), activation="softmax")(x)
rnn_model = keras.Model([frame_features_input, mask_input], output)
rnn_model.compile(
loss="sparse_categorical_crossentropy", optimizer="adam", metrics=["accuracy"]
)
return rnn_model
# Utility for running experiments.
def run_experiment():
now = datetime.now()
current_time = now.strftime("%d_%m_%Y_%H_%M_%S")
filepath = os.path.join(Path('F:/RNN'), f'RNN_ResNet_Model_{current_time}')
checkpoint = keras.callbacks.ModelCheckpoint(
filepath, save_weights_only=True, save_best_only=True, verbose=1
)
seq_model = get_sequence_model()
history = seq_model.fit(training_generator,
epochs=EPOCHS,
callbacks=[checkpoint],
)
seq_model.load_weights(filepath)
_, accuracy = seq_model.evaluate(testing_generator)
print(f"Test accuracy: {round(accuracy * 100, 2)}%")
return history, accuracy, seq_model
_, accuracy, sequence_model = run_experiment()
I am struggling to figure out how I can pass the results of my custom data generator to my RNN model? How can I best rewrite my code to either work with model.fit() or model.fit_generator()?
Thank you in advance!
CodePudding user response:
Please specify in your questions what exactly it is you're struggling with. Do you expect different results, is your code slow, or do you get errors? Based on your code I see some issues and would suggest the following adjustments:
The __getitem__()
function in a DataGenerator is called every time you retrieve a batch of data from your generator. Within that function you call _generate_X()
which also initializes, again - at every batch generation, the pretrained ResNet feature extractor through feature_extractor = self.build_feature_extractor()
. This is highly inefficient.
As an alternative, I would propose to remove the model creation within your generator class and to rather create the feature extractor in your main notebook and give it as parameter for your DataGenerator instance:
In your main file:
def build_feature_extractor(self): [...]
feature_extractor = build_feature_extractor()
testing_generator = DataGenerator(test_list_IDs, test_labels, test_video_paths, feature_extractor)
For the generator class:
class DataGenerator(keras.utils.Sequence):
'Generates data for Keras'
def __init__(self, list_IDs, labels, video_paths, feature_extractor,
batch_size=32, video_length=10, dim=(224,224),
n_channels=3, n_classes=4, IMG_SIZE = 224, MAX_SEQ_LENGTH = 10,
NUM_FEATURES = 2048, shuffle=True):
'Initialization'
self.list_IDs = list_IDs
[...]
self.feature_extractor = feature_extractor [...]
and then adjust to this:
temp_frame_features[i, j, :] = self.feature_extractor.predict(batch[None, j, :])
You have correctly used the generator in your .fit call
, using model.fit(training_generator, ...)
will feed your model the created batches from __getitem__()
.
CodePudding user response:
The error I was getting was getting was
raise NotImplementedError keras
Rather stupidly, I had forgotten to put the following function within the DataGenerator function
def __len__(self):
'Denotes the number of batches per epoch'
return int(np.floor(len(self.list_IDs) / self.batch_size))
The error went away after that.
obsolete_hegemony did give me an excellent suggestion to optimise my code and separate the feature extraction pre-processing!