Home > database >  Await for a Sequence of Futures with timeout without failing on TimeoutException
Await for a Sequence of Futures with timeout without failing on TimeoutException

Time:11-25

I have a sequence of scala Futures of same type.

I want, after some limited time, to get a result for the entire sequence while some futures may have succeeded, some may have failed and some haven't completed yet, the non completed futures should be considered failed.

I don't want to use Await each future sequentially.

I did look at this question: Scala waiting for sequence of futures and try to use the solution from there, namely:

  private def lift[T](futures: Seq[Future[T]])(implicit ex: ExecutionContext) =
    futures.map(_.map { Success(_) }.recover { case t => Failure(t) })

  def waitAll[T](futures: Seq[Future[T]])(implicit ex: ExecutionContext) =
    Future.sequence(lift(futures))

  futures: Seq[Future[MyObject]] = ...
  val segments = Await.result(waitAll(futures), waitTimeoutMillis millis)

but I'm still getting a TimeoutException, I guess because some of the futures haven't completed yet. and that answer also states,

Now Future.sequence(lifted) will be completed when every future is completed, and will represent successes and failures using Try.

But I want my Future to be completed after the timeout has passed, not when every future in the sequence has completed. What else can I do?

CodePudding user response:

If I used raw Future (rather than some IO monad which has this functionality build-in, or without some Akka utils for exactly that) I would hack together utility like:

// make each separate future timeout
object FutureTimeout {
  // separate EC for waiting
  private val timeoutEC: ExecutorContext = ...

  private def timeout[T](delay: Long): Future[T] = Future {
    blocking {
      Thread.sleep(delay)
    }
    throw new Exception("Timeout")
  }(timeoutEC)

  def apply[T](fut: Future[T], delat: Long)(
    implicit ec: ExecutionContext
  ): Future[T] = Future.firstCompletedOf(Seq(
    fut,
    timeout(delay)
  ))
}

and then

Future.sequence(
  futures
    .map(FutureTimeout(_, delay))
    .map(Success(_))
    .recover { case e => Failure(e) }
)

Since each future would terminate at most after delay we would be able to collect them into one result right after that.

You have to remember though that no matter how would you trigger a timeout you would have no guarantee that the timeouted Future stops executing. It could run on and on on some thread somewhere, it's just that you wouldn't wait for the result. firstCompletedOf just makes this race more explicit.

Some other utilities (like e.g. Cats Effect IO) allow you to cancel computations (which is used in e.g. races like this one) but you still have to remember that JVM cannot arbitrarily "kill" a running thread, so that cancellation would happen after one stage of computation is completed and before the next one is started (so e.g. between .maps or .flatMaps).

If you aren't afraid of adding external deps there are other (and more reliable, as Thread.sleep is just a temporary ugly hack) ways of timing out a Future, like Akka utils. See also other questions like this.

CodePudding user response:

Here is solution using monix

import monix.eval.Task
import monix.execution.Scheduler

val timeoutScheduler = Scheduler.singleThread("timeout") //it's safe to use single thread here because timeout tasks are very fast

def sequenceDiscardTimeouts[T](tasks: Task[T]*): Task[Seq[T]] = {
  Task
    .parSequence(
      tasks
        .map(t =>
          t.map(Success.apply) // Map to success so we can collect the value
            .timeout(500.millis)
            .executeOn(timeoutScheduler) //This is needed to run timesouts in dedicated scheduler that won't be blocked by "blocking"/io work if you have any
            .onErrorRecoverWith { ex =>
              println("timed-out")
              Task.pure(Failure(ex)) //It's assumed that any error is a timeout. It's possible to "catch" just timeout exception here
            }
        )
    )
    .map { res =>
      res.collect { case Success(r) => r }
    }
}

Testing code

implicit val mainScheduler = Scheduler.fixedPool(name = "main", poolSize = 10)


def slowTask(msg: String) = {
  Task.sleep(Random.nextLong(1000).millis) //Sleep here to emulate a slow task
    .map { _ =>
      msg
    }
}


val app = sequenceDiscardTimeouts(
  slowTask("1"),
  slowTask("2"),
  slowTask("3"),
  slowTask("4"),
  slowTask("5"),
  slowTask("6")
)

val started: Long = System.currentTimeMillis()
app.runSyncUnsafe().foreach(println)
println(s"Done in ${System.currentTimeMillis() - started} millis")

This will print an output different for each run but it should look like following

timed-out
timed-out
timed-out
3
4
5
Done in 564 millis

Please note the usage of two separate schedulers. This is to ensure that timeouts will fire even if the main scheduler is busy with business logic. You can test it by reducing poolSize for main scheduler.

  • Related