I'm currently having a VAE working with data_x dim = [300000,60,5]
. I want to add data_y dim = [300000,1]
, which is a binary conditioner. However, I keep getting the following error:
ValueError: Graph disconnected: cannot obtain value for tensor Tensor("input_2:0", shape=(None, 1), dtype=float32) at layer "concatenate_1". The following previous layers were accessed without issue: []
I'm pretty sure concatenating the latent inputs with input_y goes wrong somehow, but I can't find the actual solution. I will post my model code below (I use a custom train_step). Maybe my entire approach is wrong?
import numpy as np
import tensorflow as tf
from tensorflow.keras.callbacks import TensorBoard
"""
## Create a sampling layer
"""
class Sampling(tf.keras.layers.Layer):
"""Uses (z_mean, z_log_var) to sample z."""
def call(self, inputs):
z_mean, z_log_var = inputs
batch = tf.shape(z_mean)[0]
dim = tf.shape(z_mean)[1]
epsilon = tf.keras.backend.random_normal(shape=(batch, dim))
return z_mean tf.exp(0.5 * z_log_var) * epsilon
"""
## Build the encoder
"""
latent_dim = 8
N_chan= 5
input_x = tf.keras.layers.Input(shape=(60,N_chan))
input_y = tf.keras.layers.Input(shape=(1,))
x = tf.keras.layers.Masking(mask_value=999)(input_x)
x = tf.keras.layers.Conv1D(32, 3, activation="relu", strides=2, padding="same")(x)
x = tf.keras.layers.Conv1D(64, 3, activation="relu", strides=2, padding="same")(x)
x = tf.keras.layers.Conv1D(64, 3, activation="relu", strides=1, padding="same")(x)
x = tf.keras.layers.Conv1D(64, 3, activation="relu", strides=1, padding="same")(x)
x = tf.keras.layers.Flatten()(x)
conc = tf.keras.layers.Concatenate()([x, input_y])
x = tf.keras.layers.Dense(64, activation='relu')(conc)
# x = tf.keras.layers.Dense(latent_dim)(x)
z_mean = tf.keras.layers.Dense(latent_dim, name="z_mean")(x)
z_log_var = tf.keras.layers.Dense(latent_dim, name="z_log_var")(x)
z = Sampling()([z_mean, z_log_var])
encoder = tf.keras.Model(inputs=[input_x,input_y], outputs=[z_mean, z_log_var, z], name="encoder")
encoder.summary()
embedd = tf.keras.layers.Input(shape=(latent_dim,))
merged_input = tf.keras.layers.Concatenate()([embedd, input_y])
x = tf.keras.layers.Dense(15 * 64, activation='relu')(merged_input)
x = tf.keras.layers.Reshape(target_shape=(15, 64))(x)
x = tf.keras.layers.Conv1DTranspose(filters=64,kernel_size=3,strides=1,padding='same', activation='relu')(x)
x = tf.keras.layers.Conv1DTranspose(filters=64,kernel_size=3,strides=2,padding='same',activation='relu')(x)
x = tf.keras.layers.Conv1DTranspose(filters=64,kernel_size=3,strides=1,padding='same',activation='relu')(x)
x = tf.keras.layers.Conv1DTranspose(filters=32,kernel_size=3,strides=2,padding='same',activation="relu")(x)
decoder_outputs = tf.keras.layers.Conv1DTranspose(filters=N_chan,kernel_size=3,activation="sigmoid",padding="same")(x)
decoder = tf.keras.Model(inputs=embedd, outputs=decoder_outputs, name="decoder")
decoder.summary()
"""
## Define the VAE as a `Model` with a custom `train_step`
"""
class VAE(tf.keras.Model):
def __init__(self, encoder, decoder, **kwargs):
super(VAE, self).__init__(**kwargs)
self.encoder = encoder
self.decoder = decoder
self.total_loss_tracker = tf.keras.metrics.Mean(name="total_loss")
self.reconstruction_loss_tracker = tf.keras.metrics.Mean(
name="reconstruction_loss"
)
self.kl_loss_tracker = tf.keras.metrics.Mean(name="kl_loss")
self.kl_weight = 0.002
@property
def metrics(self):
return [
self.total_loss_tracker,
self.reconstruction_loss_tracker,
self.kl_loss_tracker,
]
def train_step(self, data):
with tf.GradientTape() as tape:
z_mean, z_log_var, z = self.encoder(data)
reconstruction = self.decoder(z)
reconstruction_loss = tf.reduce_mean(
tf.reduce_sum(
tf.keras.losses.binary_crossentropy(data, reconstruction), axis=(1)
)
)
kl_loss = -0.5 * (1 z_log_var - tf.square(z_mean) - tf.exp(z_log_var))
kl_loss = tf.reduce_mean(tf.reduce_sum(kl_loss, axis=1))
total_loss = reconstruction_loss self.kl_weight*kl_loss
grads = tape.gradient(total_loss, self.trainable_weights)
self.optimizer.apply_gradients(zip(grads, self.trainable_weights))
self.total_loss_tracker.update_state(total_loss)
self.reconstruction_loss_tracker.update_state(reconstruction_loss)
self.kl_loss_tracker.update_state(kl_loss)
return {
"loss": self.total_loss_tracker.result(),
"reconstruction_loss": self.reconstruction_loss_tracker.result(),
"kl_loss": self.kl_loss_tracker.result(),
}
#%% load data
file_x = "x_cvae.npy"
file_y = "y_cvae_binary.npy"
data_x,data_y = np.load(file_x),np.expand_dims(np.load(file_y),-1)
#%%
NAME = "Test"
vae = VAE(encoder, decoder)
vae.compile(optimizer=tf.keras.optimizers.Adam())
tensorboard = TensorBoard(log_dir='logs\\{}'.format(NAME),profile_batch=0)
vae.fit([data_x,data_y],data_x, epochs=10, batch_size=64
,validation_data=([data_x,data_y],data_x),callbacks=[tensorboard])```
CodePudding user response:
The problem is that your encoder
and decoder
models are sharing the same input input_y
although they are two separate models. You will have to define a separate input_y
for your decoder
or merge the encoder and decoder into one model. Here is an example of how to use two inputs for both the encoder and the decoder.