I would like to do a test about training a machine learning model on EC2 instance with only CPUs from jupyter notebook.
The code is tensorflow 2.8. Based on the tf doc, https://www.tensorflow.org/api_docs/python/tf/keras/Model#fit,
use_multiprocessing Boolean.
Used for generator or keras.utils.Sequence input only.
If True, use process-based threading. If unspecified, use_multiprocessing will
default to False. Note that because this implementation relies on multiprocessing,
you should not pass non-picklable arguments to the generator as they can't be passed
easily to children processes.
I am trying to enable "use_multiprocessing" for model.fit().
import tensorflow.keras as keras
import tensorflow as tf
from tensorflow.keras.utils import Sequence
feature_format = {
"a": tf.io.FixedLenFeature((1,), dtype=tf.int8),
"b": tf.io.FixedLenFeature((1,), dtype=tf.int8),
}
label_format = {
"label": tf.io.FixedLenFeature((1,), dtype=tf.int64, default_value=(0,))
}
class DataGenerator(Sequence):
def __init__(self):
features = { 'a': tf.constant([1,2,3]),
'b': tf.constant([-1,-2,-3]),
'labels': tf.constant([1,1,0])
}
feature_dataset = tf.data.Dataset.from_tensors(features)
tf.print(f"type{feature_dataset}")
for x in feature_dataset:
print(x['a'])
def _process(input_dataset):
features = tf.io.parse_single_example(input_dataset, feature_format)
labels = tf.io.parse_single_example(input_dataset, label_format)
return (features, labels)
feature_dataset = feature_dataset.shuffle(1)
feature_dataset = feature_dataset.map(_process)
feature_dataset = feature_dataset.cache()
feature_dataset = feature_dataset.repeat(2)
feature_dataset = feature_dataset.batch(1)
self.feature_dataset = feature_dataset.prefetch(1)
self.data_size = 3
self.batch_size = 1
def __len__(self):
return np.math.ceil(self.data_size/self.batch_size)
def __getitem__(self):
return self.feature_dataset
class MyModel(keras.Model):
def __init__(self):
super().__init__()
self.embedding_layer1 = keras.layers.embedding(10, 32)
self.embedding_layer2 = keras.layers.embedding(10, 32)
# the input data needs to be accessed by column name
# so, the dataset is created with dictionary.
def call(self, input_dataset, training=True):
f1_data, f2_data = {}, {}
for col in ['a']:
f1_data[col] = input_dataset(col)
for col in ['b']:
f2_data[col] = input_dataset(col)
f1_out = self.embedding_layer1(f1_data)
f2_out = self.embedding_layer2(f2_data)
result = tf.reduce_sum(tf.multiply(f1_out, f2_out))
return result
model = MyModel()
dg = DataGenerator()
model.fit_generator(dg,
epochs=2,
workers=psutil.cpu_count(),
use_multiprocessing=True
)
TypeError: in user code:
features = tf.io.parse_single_example(input_dataset, feature_format)
TypeError: Expected any non-tensor type, but got a tensor instead.
How can I make a dataset such that it can be accepted by the parse_single_example()? Currently, the dataset is a TensorDataset and each tensor can be accessed by a key. thanks
UPDATE
I have tried the following answer. But, got an error.
import psutil
import numpy as np
class MyModel(keras.Model):
def __init__(self):
super().__init__()
self.embedding_layer1 = keras.layers.Embedding(10, 32)
self.embedding_layer2 = keras.layers.Embedding(10, 32)
# the input data needs to be accessed by feature name
def call(self, input_dataset, training=True):
f1_data, f2_data = {}, {}
for col in ['a']:
f1_data[col] = input_dataset[col] # error !
for col in ['b']:
f2_data[col] = input_dataset[col]
f1_out = self.embedding_layer1(f1_data)
f2_out = self.embedding_layer2(f2_data)
result = tf.reduce_sum(tf.multiply(f1_out, f2_out))
return result
model = MyModel()
dg = DataGenerator()
model.compile(optimizer=tf.optimizers.Adam(learning_rate=0.001),
loss = tf.keras.losses.BinaryCrossentropy(),
run_eagerly=False,
metrics=[keras.metrics.BinaryAccuracy()]
)
model.fit_generator(dg,
epochs=2,
workers=psutil.cpu_count(),
use_multiprocessing=True
)
TypeError: Exception encountered when calling layer "my_model_7" (type MyModel).
'PrefetchDataset' object is not subscriptable
Call arguments received:
• input_dataset=<PrefetchDataset element_spec=({'a':
TensorSpec(shape=(None, 1), dtype=tf.int64, name=None), 'b':
TensorSpec(shape=(None, 1), dtype=tf.int64, name=None)},
{'label': TensorSpec(shape=(None, 1), dtype=tf.int64, name=None)})>
• training=False
CodePudding user response:
If you want to parse / deserialize a tf.train.Example
, you have to serialize it first by creating a protobuf string. Otherwise it makes no sense. Here is an example:
import tensorflow.keras as keras
import tensorflow as tf
from tensorflow.keras.utils import Sequence
feature_format = {"a": tf.io.FixedLenFeature((1,), dtype=tf.int64),"b": tf.io.FixedLenFeature((1,), dtype=tf.int64)}
label_format = { "label": tf.io.FixedLenFeature((1,), dtype=tf.int64, default_value=(0,))}
def _int64_feature(value):
return tf.train.Feature(int64_list=tf.train.Int64List(value=[value]))
def _process(input_dataset):
features = tf.io.parse_single_example(input_dataset, feature_format)
labels = tf.io.parse_single_example(input_dataset, label_format)
return features, labels
def serialize_example(a, b, labels):
feature = {
'a': _int64_feature(a.numpy()),
'b': _int64_feature(b.numpy()),
'labels': _int64_feature(labels.numpy()),
}
example_proto = tf.train.Example(features=tf.train.Features(feature=feature))
return example_proto.SerializeToString()
class DataGenerator(Sequence):
def __init__(self):
feature_dataset = tf.data.Dataset.from_tensor_slices(([1,2,3], [-1,-2,-3], [1,1,0]))
feature_dataset = feature_dataset.shuffle(1)
feature_dataset = feature_dataset.map(lambda a, b, l: tf.py_function(func=serialize_example, inp=[a, b, l], Tout=tf.string))
feature_dataset = feature_dataset.map(_process)
feature_dataset = feature_dataset.cache()
feature_dataset = feature_dataset.repeat(2)
feature_dataset = feature_dataset.batch(1)
self.feature_dataset = feature_dataset.prefetch(1)
self.data_size = 3
self.batch_size = 1
tf.print(f"type{feature_dataset}")
for data, labels in feature_dataset:
print(data['a'])
def __len__(self):
return np.math.ceil(self.data_size/self.batch_size)
def __getitem__(self):
return self.feature_dataset
dg = DataGenerator()
type<BatchDataset element_spec=({'a': TensorSpec(shape=(None, 1), dtype=tf.int64, name=None), 'b': TensorSpec(shape=(None, 1), dtype=tf.int64, name=None)}, {'label': TensorSpec(shape=(None, 1), dtype=tf.int64, name=None)})>
tf.Tensor([[1]], shape=(1, 1), dtype=int64)
tf.Tensor([[2]], shape=(1, 1), dtype=int64)
tf.Tensor([[3]], shape=(1, 1), dtype=int64)
tf.Tensor([[1]], shape=(1, 1), dtype=int64)
tf.Tensor([[2]], shape=(1, 1), dtype=int64)
tf.Tensor([[3]], shape=(1, 1), dtype=int64)