Ruby 3 introduced Fiber.schedule
to dispatch async tasks concurrently.
Similar to what's being asked in this question (which is about threaded concurrency) I would like a way to start multiple concurrent tasks on the fiber scheduler and once they have all been scheduled wait for their combined result, sort of equivalent to Promise.all
in JavaScript.
I can come up with this naive way:
require 'async'
def io_work(t)
sleep t
:ok
end
Async do
results = []
[0.1, 0.3, 'cow'].each_with_index do |t, i|
n = i 1
Fiber.schedule do
puts "Starting fiber #{n}\n"
result = io_work t
puts "Done working for #{t} seconds in fiber #{n}"
results << [n, result]
rescue
puts "Execution failed in fiber #{n}"
results << [n, :error]
end
end
# await combined results
sleep 0.1 until results.size >= 3
puts "Results: #{results}"
end
Is there a simpler construct that will do the same?
CodePudding user response:
Since Async
tasks are already scheduled I am not sure you need all of that.
If you just want to wait for all the items to finish you can use an Async::Barrier
Example:
require 'async'
require 'async/barrier'
def io_work(t)
sleep t
:ok
end
Async do
barrier = Async::Barrier.new
results = []
[1, 0.3, 'cow'].each.with_index(1) do |data, idx|
barrier.async do
results << begin
puts "Starting task #{idx}\n"
result = io_work data
puts "Done working for #{data} seconds in task #{idx}"
[idx,result]
rescue
puts "Execution failed in task #{idx}"
[idx, :error]
end
end
end
barrier.wait
puts "Results: #{results}"
end
Based on the sleep
values this will output
Starting task 1
Starting task 2
Starting task 3
Execution failed in task 3
Done working for 0.3 seconds in task 2
Done working for 1 seconds in task 1
Results: [[3, :error], [2, :ok], [1, :ok]]
The barrier.wait
will wait until all the asynchronous tasks are complete, without it the output would look like
Starting fiber 1
Starting fiber 2
Starting fiber 3
Execution failed in fiber 3
Results: [[3, :error]]
Done working for 0.3 seconds in fiber 2
Done working for 1 seconds in fiber 1
CodePudding user response:
I wasn't to happy with the ergonomics of the solution, so I made the gem fiber-collector to address it.
Disclaimer: I'm describing a library I am the author of
Example usage in the scenario from the question:
require 'async'
require 'fiber/collector'
def io_work(t)
sleep t
:ok
end
Async do
Fiber::Collector.schedule { io_work(1) }.and { io_work(0.3) }.all
end.wait
# => [:ok, :ok]
Async do
Fiber::Collector.schedule { io_work(1) }.and { io_work(0.3) }.and { io_work('cow') }.all
end.wait
# => raises error