I apologize as this will be a bit of a long question.
Both my inputs and outputs are variable length sequences. The input is first run through an embedder like word2vec and then normalized while the output is composed of binary labels for each position in a sequence.
These sequences cannot be readily converted into a fixed-length form as their lengths fall on an exponential distribution. The vast majority of these sequences are under 500 tokens long, but the upper limit of my dataset is 3,005 tokens long. Some extremes in a more exhaustive dataset would fall in the range of 35,000 tokens long. Regardless, it's not very practical to just pad all of my data to a fixed length. Further, I would like to avoid implementing a loop where I pad the sequences and use train_on_batch due to concerns of my accuracy metrics appearing higher than they should due to the padded inputs.
My model is a 1D version of u-net. One issue this creates comes from the pooling/upsampling operations. In order for sequences to be upsampled to the same size as the skip-connections they're attached to, the length of my sequences needs to fall on a multiple of 16.
Thus, my idea was to create a custom layer which would fall directly after the input layer and grab the length of all input sequences. Then, it would calculate the length it needed to pad its input to with the following expression 'int(16*np.ceil(max(lengths)/16))' which should round up the highest length to the nearest 16th. It would then use this to pad the sequences to the calculated length and use the original length to generate a mask. The padded sequences would be passed to the next layer in the model while the input mask would only return at the end to truncate the output to the proper length. Or, at the very least, to zero out the effects of the padded portions of the model on the weights during training.
I did get this to work at least for the predict call of the model by creating both a custom layer and creating a custom model wrapper. Unfortunately, all of my attempts to implement the same for the train_step and test_step functions have failed, usually getting errors saying that I can't convert a keras tensor to a numpy array. Which I would normally understand since that would ruin backpropagation, but I don't need to backpropagate through the custom layer. I almost need it to be treated as an internal input layer. I got frustrated and erased the code I wrote yesterday for both the train and test steps after spending 6 hours on a single problem, but I do have the rest of my implementation:
class AutomaticPadder(tf.keras.layers.Layer):
def __init__(self, factor=16):
super(AutomaticPadder, self).__init__()
self.factor = factor
def __call__(self, inputs):
#init = tf.cast(inputs,tf.RaggedTensor)
#lengths = [seq.shape[0] for seq in inputs]
lengths = list(x.shape[0] for x in inputs)
# print(lengths)
max_len = int(self.factor*tf.math.ceil(max(lengths)/self.factor))
# print(max_len)
masks = [[True]*length for length in lengths]
# print(masks)
sequences = tf.constant(pad_sequences(inputs.to_list(),max_len,dtype="float32",padding="post",value = 0))
masks = pad_sequences(masks,max_len,dtype="bool",padding="post",value=False)
return sequences, masks
class ModelPadder(tf.keras.Model):
def __init__(self,model,factor=16):
super(ModelPadder, self).__init__()
self.model = model
self.padder = AutomaticPadder(factor)
def pad(self,inputs):
sequences,masks = self.padder(inputs)
return sequences
def predict(self, instances, **kwargs):
if type(instances) is tf.RaggedTensor:
inputs = instances
elif type(instances) tf.Tensor:
inputs = tf.RaggedTensor.from_tensor(instances)
else:
inputs = tf.ragged.constant(np.asarray(instances))
padded_inputs, masks = self.padder(inputs)
# print(inputs)
y_pred = self.model.predict(padded_inputs)
y_pred = tf.ragged.boolean_mask(y_pred,masks)
none_axes = [i for i in range(len(y_pred.shape)) if y_pred.shape[i] == None]
# print(none_axes)
const_query = [tf.math.reduce_std(tf.cast(y_pred.row_lengths(axis=i),tf.float32)).numpy() == 0 for i in none_axes]
# print(y_pred.row_lengths(axis=1))
# print(const_query)
if all(const_query):
return np.asarray(y_pred.to_tensor())
else:
return y_pred
def __call__(self,*args,**kwargs):
return self.model(*args,**kwargs)
Note: in the predict call, I also have the code check if the lengths of all sequences are the same by calculating the standard deviation of the lengths and convert to a regular tensor if the standard deviation is zero. This was the reason for the variables none_axes and const_query.
Any suggestions?
CodePudding user response:
No one responded, but I gave myself a few days and came back to the problem and figured out a solution. I figured I might as well post it here in case anyone else comes across a similar issue.
Note: It is very important that the model be compiled with run_eagerly set to true as shown below.
class AutomaticPadder(tf.keras.layers.Layer):
def __init__(self, factor=16):
super(AutomaticPadder, self).__init__()
self.factor = factor
def __call__(self, inputs):
inputs = tf.stop_gradient(inputs)
# print(type(inputs))
max_len = max(inputs.row_lengths())
max_len = int(self.factor*tf.math.ceil(max_len/self.factor))
masks = tf.math.reduce_any(tf.ones_like(inputs),-1).to_tensor(False,shape=(inputs.shape[0],max_len))
sequences = inputs.to_tensor(0,shape=(inputs.shape[0],max_len,inputs.shape[2]))
return sequences,masks
class ModelPadder(tf.keras.Model):
def __init__(self,model,factor=16):
super(ModelPadder, self).__init__()
self.model = model
self.padder = AutomaticPadder(factor)
def pad(self,inputs):
sequences,masks = self.padder(inputs)
return sequences
def predict(self, instances, **kwargs):
inputs = self.convert_to_ragged(instances) # convert to ragged tensor
padded_inputs, masks = self.padder(inputs) # pad sequences to nearest factor multiple of factor
y_pred = self.model.predict(padded_inputs)
y_pred = tf.ragged.boolean_mask(y_pred,masks)
none_axes = [i for i in range(len(y_pred.shape)) if y_pred.shape[i] == None]
const_query = [tf.math.reduce_std(tf.cast(y_pred.row_lengths(axis=i),tf.float32)).numpy() == 0 for i in none_axes]
if all(const_query):
return np.asarray(y_pred.to_tensor())
else:
return y_pred
def convert_to_ragged(self,instances):
if type(instances) is tf.RaggedTensor:
out = instances
elif type(instances) is tf.Tensor:
out = tf.RaggedTensor.from_tensor(instances)
else:
out = tf.ragged.constant(np.asarray(instances))
return out
def test_step(self, data):
# Unpack the data
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
if len(data) == 3:
x, y, sample_weight = data
else:
sample_weight = None
x, y = data
x = self.convert_to_ragged(x)
x,masks = self.padder(x)
# Compute predictions
y_pred = self(x, training=False)
y_pred = tf.ragged.boolean_mask(y_pred,masks)
# Updates the metrics tracking the loss
self.compiled_loss(y, y_pred, regularization_losses=self.losses)
# Update the metrics.
self.compiled_metrics.update_state(y, y_pred)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
def train_step(self, data):
# Unpack the data. Its structure depends on your model and
# on what you pass to `fit()`.
if len(data) == 3:
x, y, sample_weight = data
else:
sample_weight = None
x, y = data
if len(data) == 3:
x, y, sample_weight = data
else:
sample_weight = None
x, y = data
x = self.convert_to_ragged(x)
x,masks = self.padder(x)
#x_copy = tf.make_ndarray(x)
with tf.GradientTape() as tape:
y_pred = self(x, training=True) # Forward pass
y_pred = tf.ragged.boolean_mask(y_pred,masks) # truncate according to masks
# Compute the loss value.
# The loss function is configured in `compile()`.
loss = self.compiled_loss(
y,
y_pred,
sample_weight=sample_weight,
regularization_losses=self.losses,
)
# Compute gradients
trainable_vars = self.trainable_variables
gradients = tape.gradient(loss, trainable_vars)
# Update weights
self.optimizer.apply_gradients(zip(gradients, trainable_vars))
# Update the metrics.
# Metrics are configured in `compile()`.
self.compiled_metrics.update_state(y, y_pred, sample_weight=sample_weight)
# Return a dict mapping metric names to current value.
# Note that it will include the loss (tracked in self.metrics).
return {m.name: m.result() for m in self.metrics}
def __call__(self,*args,**kwargs):
return self.model(*args,**kwargs)
inner_model = create_model() # create model
model = ModelPadder(inner_model,factor=16)
model.compile(run_eagerly=True)