Home > Blockchain >  Ruby 3 collecting results from multiple scheduled fibers
Ruby 3 collecting results from multiple scheduled fibers

Time:12-09

Ruby 3 introduced Fiber.schedule to dispatch async tasks concurrently.

Similar to what's being asked in this question (which is about threaded concurrency) I would like a way to start multiple concurrent tasks on the fiber scheduler and once they have all been scheduled wait for their combined result, sort of equivalent to Promise.all in JavaScript.

I can come up with this naive way:

require 'async'

def io_work(t)
  sleep t
  :ok
end

Async do
  results = []

  [0.1, 0.3, 'cow'].each_with_index do |t, i|
    n = i   1
    Fiber.schedule do
      puts "Starting fiber #{n}\n"
      result = io_work t
      puts "Done working for #{t} seconds in fiber #{n}"
      results << [n, result]
    rescue
      puts "Execution failed in fiber #{n}"
      results << [n, :error]
    end
  end

  # await combined results
  sleep 0.1 until results.size >= 3

  puts "Results: #{results}"
end

Is there a simpler construct that will do the same?

CodePudding user response:

Since Async tasks are already scheduled I am not sure you need all of that.

If you just want to wait for all the items to finish you can use an Async::Barrier

Example:

require 'async'
require 'async/barrier'


def io_work(t)
  sleep t
  :ok
end

Async do
  barrier = Async::Barrier.new
  results = []
  [1, 0.3, 'cow'].each.with_index(1) do |data, idx|
    barrier.async do 
      results << begin
        puts "Starting task #{idx}\n"
        result = io_work data
        puts "Done working for #{data} seconds in task #{idx}"
        [idx,result]
      rescue
        puts "Execution failed in task #{idx}"
        [idx, :error]
      end          
    end 
  end
  barrier.wait
  puts "Results: #{results}"
end

Based on the sleep values this will output

Starting task 1
Starting task 2
Starting task 3
Execution failed in task 3
Done working for 0.3 seconds in task 2
Done working for 1 seconds in task 1
Results: [[3, :error], [2, :ok], [1, :ok]]

The barrier.wait will wait until all the asynchronous tasks are complete, without it the output would look like

Starting fiber 1
Starting fiber 2
Starting fiber 3
Execution failed in fiber 3
Results: [[3, :error]]
Done working for 0.3 seconds in fiber 2
Done working for 1 seconds in fiber 1

CodePudding user response:

I wasn't to happy with the ergonomics of the solution, so I made the gem fiber-collector to address it.

Disclaimer: I'm describing a library I am the author of

Example usage in the scenario from the question:

require 'async'
require 'fiber/collector'

def io_work(t)
  sleep t
  :ok
end

Async do
  Fiber::Collector.schedule { io_work(1) }.and { io_work(0.3) }.all
end.wait
# => [:ok, :ok]


Async do
  Fiber::Collector.schedule { io_work(1) }.and { io_work(0.3) }.and { io_work('cow') }.all
end.wait
# => raises error
  • Related