Home > database >  Tensorflow UNAVAILABLE: Error reported from /job:worker/task:2
Tensorflow UNAVAILABLE: Error reported from /job:worker/task:2

Time:07-08

I'm trying to run the multi-worker MNIST CPU training (MultiWorkerMirroredStrategy) from the tensorflow website using Tensorflow 2.9.1 with python 3.9. Three workers are launched on the same host. Worker 1,2 terminate without error but Worker 0 reports the error:

UNAVAILABLE: Error reported from /job:worker/task:2: Task /job:worker/replica:0/task:2 heartbeat timeout. This indicates that the remote task has failed, got preempted, or crashed unexpectedly. [type.googleapis.com/tensorflow.CoordinationServiceError='"\n\n\x06worker\x10\x02']

Code tf_mnist_multi_worker.py:

import json
import os
import tensorflow as tf
import numpy as np


def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = tf.data.Dataset.from_tensor_slices(
        (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
    return train_dataset

def build_and_compile_cnn_model():
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(28, 28)),
        tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
        tf.keras.layers.Conv2D(32, 3, activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
        metrics=['accuracy'])
    return model


tf_config = json.loads(os.environ['TF_CONFIG'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()

PER_WORKER_BATCH_SIZE = 64
global_batch_size = PER_WORKER_BATCH_SIZE * len(tf_config['cluster']['worker'])
options = tf.data.Options()
options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
multi_worker_dataset = mnist_dataset(global_batch_size)
multi_worker_dataset = multi_worker_dataset.with_options(options)

with strategy.scope():
    # Model building/compiling need to be within `strategy.scope()`.
    multi_worker_model = build_and_compile_cnn_model()

multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

if tf_config['task']['index'] == 0:
    multi_worker_model.save(".")

In three separate terminals the following three command sequences are executed:

Linux Terminal1 1:

export TF_CONFIG='{"cluster": {"worker": ["localhost:7000", "localhost:7001", "localhost:7002"]}, "task": {"type": "worker", "index": 0}}'
python tf_mnist_multi_worker.py

Linux Terminal 2:

export TF_CONFIG='{"cluster": {"worker": ["localhost:7000", "localhost:7001", "localhost:7002"]}, "task": {"type": "worker", "index": 1}}'
python tf_mnist_multi_worker.py

Linux Terminal 3:

export TF_CONFIG='{"cluster": {"worker": ["localhost:7000", "localhost:7001", "localhost:7002"]}, "task": {"type": "worker", "index": 2}}'
python tf_mnist_multi_worker.py

Full error log of worker 0:

2022-07-05 08:38:52.253059: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2022-07-05 08:38:52.253115: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.
2022-07-05 08:38:53.656584: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2022-07-05 08:38:53.656637: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2022-07-05 08:38:53.656668: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (blipp65.sdp.research.bell-labs.com): /proc/driver/nvidia/version does not exist
2022-07-05 08:38:53.657282: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-07-05 08:38:53.667990: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:7000, 1 -> localhost:7001, 2 -> localhost:7002}
2022-07-05 08:38:53.668098: I tensorflow/core/distributed_runtime/rpc/grpc_channel.cc:272] Initialize GrpcChannelCache for job worker -> {0 -> localhost:7000, 1 -> localhost:7001, 2 -> localhost:7002}
2022-07-05 08:38:53.668800: I tensorflow/core/distributed_runtime/rpc/grpc_server_lib.cc:438] Started server with target: grpc://localhost:7000
2022-07-05 08:38:55.567814: W tensorflow/core/framework/dataset.cc:768] Input of GeneratorDatasetOp::Dataset will not be optimized because the dataset does not implement the AsGraphDefInternal() method needed to apply optimizations.
Epoch 1/3
70/70 [==============================] - 8s 76ms/step - loss: 2.2734 - accuracy: 0.1854
Epoch 2/3
70/70 [==============================] - 5s 73ms/step - loss: 2.2141 - accuracy: 0.3222
Epoch 3/3
70/70 [==============================] - 5s 71ms/step - loss: 2.1473 - accuracy: 0.4274
WARNING:absl:Found untraced functions such as _jit_compiled_convolution_op while saving (showing 1 of 1). These functions will not be directly callable after loading.
2022-07-05 08:39:24.672522: E tensorflow/core/common_runtime/base_collective_executor.cc:249] BaseCollectiveExecutor::StartAbort UNAVAILABLE: Error reported from /job:worker/task:1: Task /job:worker/replica:0/task:1 heartbeat timeout. This indicates that the remote task has failed, got preempted, or crashed unexpectedly. [type.googleapis.com/tensorflow.CoordinationServiceError='\"\n\n\x06worker\x10\x01']
2022-07-05 08:39:24.674058: E tensorflow/core/common_runtime/ring_alg.cc:290] Aborting RingReduce with UNAVAILABLE: Collective ops is aborted by: Error reported from /job:worker/task:1: Task /job:worker/replica:0/task:1 heartbeat timeout. This indicates that the remote task has failed, got preempted, or crashed unexpectedly.
The error could be from a previous operation. Restart your program to reset. [type.googleapis.com/tensorflow.DerivedStatus='']
Traceback (most recent call last):
  File "./tf_mnist_multi_local/tf_mnist_multi_worker.py", line 59, in <module>
    multi_worker_model.save(".")
  File "./miniconda3/envs/tfmnist/lib/python3.9/site-packages/keras/utils/traceback_utils.py", line 67, in error_handler
    raise e.with_traceback(filtered_tb) from None
  File "./miniconda3/envs/tfmnist/lib/python3.9/multiprocessing/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "./miniconda3/envs/tfmnist/lib/python3.9/multiprocessing/pool.py", line 771, in get
    raise self._value
  File "./miniconda3/envs/tfmnist/lib/python3.9/multiprocessing/pool.py", line 125, in worker
    result = (True, func(*args, **kwds))
  File "./miniconda3/envs/tfmnist/lib/python3.9/multiprocessing/pool.py", line 48, in mapstar
    return list(map(*args))
tensorflow.python.framework.errors_impl.UnavailableError: Collective ops is aborted by: Error reported from /job:worker/task:1: Task /job:worker/replica:0/task:1 heartbeat timeout. This indicates that the remote task has failed, got preempted, or crashed unexpectedly.
The error could be from a previous operation. Restart your program to reset. [Op:CollectiveReduceV2]

Any idea what may be wrong?

CodePudding user response:

It turns out that multi_worker_model.save() must be called in all workers even if the data of the non-zero workers is not required.

Here the full working code (notice the last 2 lines, which were added):

import json
import os
import tensorflow as tf
import numpy as np


def mnist_dataset(batch_size):
    (x_train, y_train), _ = tf.keras.datasets.mnist.load_data()
    # The `x` arrays are in uint8 and have values in the [0, 255] range.
    # You need to convert them to float32 with values in the [0, 1] range.
    x_train = x_train / np.float32(255)
    y_train = y_train.astype(np.int64)
    train_dataset = tf.data.Dataset.from_tensor_slices(
        (x_train, y_train)).shuffle(60000).repeat().batch(batch_size)
    return train_dataset

def build_and_compile_cnn_model():
    model = tf.keras.Sequential([
        tf.keras.layers.InputLayer(input_shape=(28, 28)),
        tf.keras.layers.Reshape(target_shape=(28, 28, 1)),
        tf.keras.layers.Conv2D(32, 3, activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(128, activation='relu'),
        tf.keras.layers.Dense(10)
    ])
    model.compile(
        loss=tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True),
        optimizer=tf.keras.optimizers.SGD(learning_rate=0.001),
        metrics=['accuracy'])
    return model


tf_config = json.loads(os.environ['TF_CONFIG'])
strategy = tf.distribute.MultiWorkerMirroredStrategy()

PER_WORKER_BATCH_SIZE = 64
global_batch_size = PER_WORKER_BATCH_SIZE * len(tf_config['cluster']['worker'])

with strategy.scope():
    options = tf.data.Options()
    options.experimental_distribute.auto_shard_policy = tf.data.experimental.AutoShardPolicy.DATA
    multi_worker_dataset = mnist_dataset(global_batch_size)
    multi_worker_dataset = multi_worker_dataset.with_options(options)
    # Model building/compiling need to be within `strategy.scope()`.
    multi_worker_model = build_and_compile_cnn_model()

#multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)
multi_worker_model.fit(multi_worker_dataset, epochs=3, steps_per_epoch=70)

if tf_config['task']['index'] == 0:
    multi_worker_model.save(".")
else:
    multi_worker_model.save(f"./workertmp{tf_config['task']['index']}")
  • Related