I am using the Ray library to process a bunch of tasks in parallel. The way I am doing this is through adding tasks to be processed, processing them all together, and then updating a queue which limits me to the slowest task at each level. Is there a way to dynamically update the queue every time a task is finished?
Here's some code:
while len(queue) > 0:
# add some tasks to be processed from queue
tasks = [x for x in queue if not some_condition(x)]
# remove tasks from queue
queue = [x for x in queue if x not in tasks]
#call Ray remote function on each node in the tasks
outputs = [self.process_nodes.remote(node) for node in tasks]
# go through outputs, update queue given children and relevant attributes and iterate.
out = ray.get(outputs)
do_stuff(out)
Regarding the code above, outputs are waiting for all the nodes in the tasks list to finish to that it can move on, I'm wondering if there's a way to get a task from outputs as soon as it finishes, process it, update the queue and process all the remaining nodes in the task?
CodePudding user response:
You can use ray.wait()
to check which tasks are finished and which need to wait longer.
You can run it in loop until you get all results.
import time
import ray
@ray.remote
def function(i):
time.sleep(i)
return i
print('create tasks')
# create tasks
all_tasks = [function.remote(i) for i in range(4)]
print('wait for results')
# run loop to get all results
while all_tasks:
# wait for (at least one) finished tasks (and other tasks)
finished, all_tasks = ray.wait(all_tasks, num_returns=1, timeout=None)
for task in finished:
result = ray.get(task)
print('result:', result)
print('len(all_tasks):', len(all_tasks))
Result:
create tasks
wait for results
result: 0
len(all_tasks): 3
result: 1
len(all_tasks): 2
result: 2
len(all_tasks): 1
result: 3
len(all_tasks): 0
If you use num_returns=2
then it waits for 2 results and you get
create tasks
wait for results
result: 0
result: 1
len(all_tasks): 2
result: 2
result: 3
len(all_tasks): 0
If you use time=0.5
then it will wait only 0.5s for result(s) so you may get result like this
create tasks
wait for results
result: 0
len(all_tasks): 3
len(all_tasks): 3
len(all_tasks): 3
result: 1
len(all_tasks): 2
len(all_tasks): 2
result: 2
len(all_tasks): 1
len(all_tasks): 1
len(all_tasks): 1
result: 3
len(all_tasks): 0
Doc: ray.wait and Fetching results