I have a stream with following structure
val source = Source(1 to 10)
val flow1 = Flow[Int].mapAsyncUnordered(2){ x =>
if (x != 7) Future.successful(x)
else Future.failed(new Exception(s"x has failed"))
val flow2 = Flow[Int].mapAsyncUnordered(2){ x =>
if (x != 4) Future.successful(x)
else Future.failed(new Exception(s"x has failed"))
val sink = Sink.fold(List[Int])((xs, x: Int) => x :: xs)
val errorSink = Sink.fold(List[Exception])((errs ,err: Exception) => err :: errs)
My question: How should I construct the divertTo function to send all exceptions to errorSink? Any suggestion on how to get the error object with information on which stage it failed would be helpful.
CodePudding user response:
I would recommend modelling your errors as a proper type so that you have a Flow[Either[CustomErrorType, Int]]
for instance and then you can use divertTo
with a predicate that looks at whether you have a Left
or Right
.
Or maybe use recover
in combination.
See this interesting article: https://bszwej.medium.com/akka-streams-error-handling-7ff9cc01bc12
CodePudding user response:
Future
encodes both asynchronicity and can-fail. You'll need to separate the asynchronicity and can-fail.
Try
, for instance, is an encoding of can-fail.
Meanwhile, mapAsyncUnordered
only emits successes (you can use a supervision strategy to decide not to fail on a failed future, but that will drop the failures not emit them).
It seems that you want to accumulate a list of failures (given the use of Sink.fold
). Since that list of failures is only accessible to the outside world through the materialized value, you'll want to use divertToMat
instead of divertTo
.
From this, the logical solution is:
import scala.concurrent.ExecutionContext
import scala.util.{ Failure, Success, Try }
// Returns a future which is only a failure on fatal exceptions
def liftToFutTry[T](fut: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] =
fut.map(Success(_))
.recoverWith {
case ex => Future.successful(Failure(ex))
}
// for some reason we want a List[Exception] rather than List[Throwable]
val errorSink: Sink[Try[Int], Future[List[Exception]]] =
Flow[Try[Int]]
.mapConcat { t =>
t.failed.get match {
case ex: Exception => List(ex)
case _ => Nil
} : List[Exception]
}
.toMat(Sink.fold(List.empty[Exception]) { (exes, ex) => ex :: exes })(Keep.right)
// materializes as a future of the exceptions which failed in mapAsyncUnordered
val flow1: Flow[Int, Int, Future[List[Exception]]] =
Flow[Int]
.mapAsyncUnordered(2) { x =>
val fut =
if (x != 7) Future.successful(x)
else Future.failed(new Exception(s"$x has failed (equaled 7)"))
liftToFutTry(fut)
}
.divertToMat(errorSink, _.isFailure)(Keep.right) // propagate the failures
.map { successfulTry => successfulTry.get }
If you have two Flow
s like this and you want to compose them, you'd do
// materialized value is (list of failures from flow1, list of failures from otherFlow
val both: Flow[Int, Int, (Future[List[Exception]], Future[List[Exception]])]
flow1
.viaMat(flow2)(Keep.both)
// materialized value is:
// (
// (
// list of failures from flow1,
// list of failures from otherFlow
// ),
// list of ints which passed through both flow1 and otherFlow
// )
both.toMat(sink)(Keep.both) : Sink[Int, ((Future[List[Exception]], Future[List[Exception]]), Future[List[Int]])]
There are other ways to encode can-fail: e.g. you could use Either
from the standard library.
Accumulating a Future[List[_]]
may be questionable; note that the Future
s won't be complete until the stream finishes.