I am using scala futures to poll API status . Continue to poll till return success or fail. How ever when any batch fails, it should throw error and stop the program.
I am not able throw error either by throw ( exception) or Future.successful(false) or Future.failed(new Exception("error")) .
package FutureScheduler
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
object delayTest extends App {
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext.Implicits.global
def getStatus(batchId: String): String = {
print(s" $batchId ${System.currentTimeMillis()} continue \n")
batchId match {
case "batch1" => "success"
case "batch2" => "failed"
case "batch2" => "success"
}
}
def waitTask(batch: (String, Int)
): Future[Boolean] =
Delayed(x.seconds)(getStatus(batch._1)).flatMap {
case "success" =>
print(s"\n${batch._1} succeeded for ${batch._2}")
Future.successful(true)
case "failed" =>
print(s"\n${batch._1} failed for ${batch._2}")
Future.failed(new Exception("error"))
throw new RuntimeException("errored")
case _ => {
waitTask(batch)
}
}
val statusList = List(Some("batch1", 123), None, Some("batch2", 124)).flatten
val y = 1
val x = 5
try {
Await.ready(Future.traverse(statusList)((waitTask _)), y.minutes)
}
catch {
case e: Exception => println("caught error")
}
print("\nbye now")
}
import scala.concurrent.duration._
import scala.concurrent.{Future, Promise}
object Delayed {
import java.util.{Timer, TimerTask}
private val timer = new Timer
def apply[T](delay: Duration)(task: => T): Future[T] = {
val promise = Promise[T]()
val tt = new TimerTask {
override def run(): Unit = promise.success(task)
}
timer.schedule(tt, delay.toMillis)
promise.future
}
}
CodePudding user response:
The throw
is happening inside the Future
returned by Delayed
so it will be caught by that Future
.
You need to turn Await.ready
into Await.result
and then look at the value that it returns to get the result of the test.