Home > other >  Recursive Promise chaining with tasks depending on each other
Recursive Promise chaining with tasks depending on each other

Time:11-03

I was trying to solve a coding problem about recursion and async but I got a bit stuck on this, this is the problem:

Task

You have to execute a number of tasks. A task is just any function (usually async).

Some tasks can depend on each other. So they must wait until the tasks they depend on complete first.

You have to wait until all the tasks are completed and return their results.

Input

An object with task ids as keys and objects describing the tasks as values:

interface TaskDict {
  [taskId: string]: {
    dependencies: string[]; // an array of task ids.
    task: (...dependencyResults: any[]) => any;
  }
}

Output

A promise that resolves with an object with task ids as keys and task results as values:

interface TaskResultDict {
  [taskId: string]: (
    {
      status: 'resolved',
      value: any
    } |
    {
      status: 'failed',
      reason: any
    } |
    {
      status: 'skipped',
      unresolvedDependencies: string[]
    }
  );
}

Note that a task should not be executed if any of its dependencies were not resolved (e.g. failed or were skipped in their turn). In this case the status will be skipped.

The same skipped status should be if a dependency is circular. Yes, there can be this mistake in the input. Apart from this the input will always be valid (no need to write validation).

Example

const {deepStrictEqual} = require('assert');

const runTasks = (tasks: TaskDict): Promise<TaskResultDict> => {
  // TODO
};

const taskResults = await runTasks({
  a: {
    dependencies: [],
    task: () => Promise.resolve(4)
  },
  b: {
    dependencies: ['a', 'c'],
    task: async (a, c) => Math.sqrt(c * c - a * a)
  },
  c: {
    dependencies: [],
    task: () => new Promise((x) => setTimeout(x, 100)).then(() => 5)
  },
  d: {
    dependencies: [],
    task: () => Promise.reject('This will fail.')
  },
  e: {
    dependencies: ['d', 'a', 'f'],
    task: console.log
  },
  f: {
    dependencies: ['f'],
    task: () => console.log('Should never run - "f" depends on itself.')
  }
});

deepStrictEqual(taskResults, {
  a: {status: 'resolved', value: 4},
  b: {status: 'resolved', value: 3},
  c: {status: 'resolved', value: 5},
  d: {status: 'failed', reason: 'This will fail.'},
  e: {status: 'skipped', unresolvedDependencies: ['d', 'f']},
  f: {status: 'skipped', unresolvedDependencies: ['f']}
});

Current approach

So far I have made this approach, but the main problem is that the logic is working as far as I don't use async tasks to be resolved, in that case, the async flow is not working as I expect so the dependencies to be resolved before executing some task are not being resolved properly, do you have any clue?

const resolveDependency = async (
  taskId: string,
  task: (...dependencyResults: any[]) => any,
  dependencies: string[],
  results: TaskResultDict
): Promise<TaskResultDict> => {
  const unresolvedDependencies = Object.entries(results).filter(
    (result) =>
      dependencies.includes(result[0]) && result[1].status !== 'resolved'
  );
  try {
    if (unresolvedDependencies.length > 0) {
      return ({
        [taskId]: {
          status: 'skipped',
          unresolvedDependencies: unresolvedDependencies.map(
            (dependency) => dependency[0]
          ),
        },
      } as TaskResultDict);
    }
    const taskValue = await task(
      ...Object.entries(results)
        .filter((result) => dependencies.includes(result[0]))
        .map(
          (result) => (result[1] as { status: 'resolved'; value: any }).value
        )
    );
    return ({
      [taskId]: {
        status: 'resolved',
        value: taskValue,
      },
    } as TaskResultDict);
  } catch (error) {
    return ({
      [taskId]: {
        status: 'failed',
        reason: error,
      },
    } as TaskResultDict);
  }
};

const runTaskWithDependencies = async (
  tasks: TaskDict,
  taskId: string,
  results: TaskResultDict
): Promise<TaskResultDict> => {
  const taskDependencies = tasks[taskId].dependencies;
  const allDependenciesExecuted = Object.keys(results).length > 0 && Object.keys(results).every((taskId) =>
    taskDependencies.includes(taskId)
  );
  if (taskDependencies.includes(taskId)) {
    return {
      [taskId]: {
        status: 'skipped',
        unresolvedDependencies: [taskId],
      },
    };
  } else if (allDependenciesExecuted || taskDependencies.length === 0) {
    const taskResult = await resolveDependency(
      taskId,
      tasks[taskId].task,
      taskDependencies,
      results
    );
    return {
      ...results,
      ...taskResult,
    };
  } else {
    return (
      await Promise.all(
        taskDependencies
          .map(
            (dependency) =>
              runTaskWithDependencies(tasks, dependency, results)
          )
      )
    ).reduce((previous, current) => {
      return {
        ...previous,
        ...current,
      };
    }, {} as TaskResultDict);
  }
};

export const runTasks = async (tasks: TaskDict): Promise<TaskResultDict> => {
  const tasksIds = Object.keys(tasks);
  return await tasksIds.reduce(async (previous, current) => {
    const taskResult = await runTaskWithDependencies(
      tasks,
      current[0],
      await previous
    );
    return {
      ...previous,
      ...taskResult,
    };
  }, Promise.resolve(<TaskResultDict>{}));
};

CodePudding user response:

Tasks must be run in a specific order depending on their dependencies, but your runTasks function just runs them in the order returned by Object.keys(tasks).


One approach could be to run batches of runnable tasks until there's no tasks to run anymore, with a task being runnable if all its dependencies have be run already.

I've reworked your types a bit to ease the following functions (still compatible with your code):

interface Task {
  dependencies: string[];
  task: (...dependencyResults: any[]) => any;
}

interface TaskDict {
  [taskId: string]: Task
}

interface ResolvedTask {
  status: 'resolved',
  value: any;
}

interface FailedTask {
  status: 'failed';
  reason: any;
}

interface SkippedTask {
  status: 'skipped';
  unresolvedDependencies: string[];
}

type TaskResult = ResolvedTask | FailedTask | SkippedTask;

interface TaskResultDict {
  [taskId: string]: TaskResult;
}

Now, we need a function that runs one task depending on the existing results of previously run tasks if any (roughly similar to your runTaskWithDependencies):

const runTask = async (task: Task, results: TaskResultDict): Promise<TaskResult> => {
  const unresolvedDependencies = task.dependencies.filter((dep) => !results[dep] || results[dep].status !== 'resolved');
  if (unresolvedDependencies.length > 0) {
    return { status: 'skipped', unresolvedDependencies };
  }
  const dependencyResults = task.dependencies.map((dep) => (results[dep] as ResolvedTask).value);
  try {
    const value = await task.task(...dependencyResults);
    return { status: 'resolved', value };
  } catch (reason) {
    return { status: 'failed', reason };
  }
};

Finally, we run batches of runnable tasks until there's no tasks to run anymore:

const runTasks = async (tasks: TaskDict): Promise<TaskResultDict> => {
  let remainingTasks: [string, Task][] = Object.entries(tasks);
  let results: TaskResultDict = {};
  while (remainingTasks.length > 0) {
    let runnableTasks = remainingTasks.filter(([_, { dependencies: deps }]) => deps.every((dep) => results.hasOwnProperty(dep)));
    if (runnableTasks.length === 0) {
      // there are remaining tasks but none of them are runnable: dependency issues, all will be skipped
      runnableTasks = remainingTasks;
    }
    const newResultPromises = runnableTasks.map(([taskId, task]) => runTask(task, results).then((result) => ({ [taskId]: result })));
    // await our batch:
    const newResults = await Promise.all(newResultPromises);
    // gather the results:
    results = Object.assign(results, ...newResults);
    remainingTasks = remainingTasks.filter(([taskId]) => !results.hasOwnProperty(taskId));
  }
  return results;
};

The graph approach proposed by enter image description here

  1. Write function that will check if there are circular dependancies, it can be a simple recursive function. You should have a Set where you store already traversed nodes, and if currently traversed node is in Set that means that you have a circular dependancy. You can for example in that case throw an error "Circular dependancy detected".

  2. Now, write function to traverse the graph and chain promises, but from the bottom, from "OUTPUT" node, because in that situation you can chain Promises more easily. At the end, that function should return a single Promise so you can just do:

const chainPromises = async (graph) => {...} // Start everything await chainPromises();

For example:

  1. First traversed node is OUTPUT node and it has two dependancies (D and E) so it should wait like await Promise.all(promiseFromD(), promiseFromE())
  2. Second traversed node is D node, it has two dependancies (A and B) so it should wait like await Promise.all(promiseFromA(), promiseFromB()) and so on...
  • Related