Home > Back-end >  Running two Tensorflow trainings in parallel using joblib and dask
Running two Tensorflow trainings in parallel using joblib and dask

Time:12-28

I have the following code that runs two TensorFlow trainings in parallel using Dask workers implemented in Docker containers.

To that end, I do the following:

  • I use joblib.delayed to spawn the two processes.
  • Within each process I run with joblib.parallel_backend('dask'): to execute the fit/training logic. Each training process triggers N dask workers.

The problem is that I don't know if the entire process is thread safe, are there any concurrency elements that I'm missing?

# First, submit the function twice using joblib delay
delayed_funcs = [joblib.delayed(train)(sub_task) for sub_task in [123, 456]]
parallel_pool = joblib.Parallel(n_jobs=2)
parallel_pool(delayed_funcs)

# Second, submit each training process
def train(sub_task):

    global client
    if client is None:
        print('connecting')
        client = Client()

    data = some_data_to_train

    # Third, process the training itself with N workers
    with joblib.parallel_backend('dask'):
        X = data[columns] 
        y = data[label]

        niceties = dict(verbose=False)
        model = KerasClassifier(build_fn=build_layers,
                loss=tf.keras.losses.MeanSquaredError(), **niceties)
        model.fit(X, y, epochs=500, verbose = 0)

CodePudding user response:

This is pure speculation, but one potential concurrency issue is due to if client is None: part, where two processes could race to create a Client.

If this is resolved (e.g. by explicitly creating a client in advance), then dask scheduler will rely on time of submission to prioritize task (unless priority is clearly assigned) and also the graph (DAG) structure, there are further details available in docs.

CodePudding user response:

The question, as given, could easily be marked as "unclear" for SO. A couple of notes:

  • global client : makes the client object available outside of the fucntion. But the function is run from another process, you do not affect the other process when making the client
  • if client is None : this is a name error, your code doesn't actually run as written
  • client = Client() : you make a new cluster in each subprocess, each assuming the total resources available, oversubscribing those resources.
  • dask knows whether any client has been created, but that doesn't help you here

You must ask yourself: why are you creating processes for the two fits at all? Why not just let Dask figure out its parallelism, which is what it's meant for.

--

I think I must answer the question as phrased in your comment, which I advise to add to the main question

I need to launch two processes, using the same dask client, where each will train their respective models with N workers.

You have the following options:

  • create a client with a specific known address within your program or beforehand, then connect to it
  • create a default client Client() and get its address (e.g., client._scheduler_identity['address']) and connect to that
  • write a scheduler information file with client.write_scheduler_file and use that

You will connect in the function with

client = Client(address)

or

client = Client(scheduler_file=the_file_you_wrote)
  • Related