Home > Software design >  How to divert Future failure in Akka stream to a separate sink?
How to divert Future failure in Akka stream to a separate sink?

Time:08-20

I have a stream with following structure

val source = Source(1 to 10)

val flow1 = Flow[Int].mapAsyncUnordered(2){ x =>
    if (x != 7) Future.successful(x)
    else Future.failed(new Exception(s"x has failed"))

val flow2 = Flow[Int].mapAsyncUnordered(2){ x =>
    if (x != 4) Future.successful(x)
    else Future.failed(new Exception(s"x has failed"))

val sink = Sink.fold(List[Int])((xs, x: Int) => x :: xs)
val errorSink = Sink.fold(List[Exception])((errs ,err: Exception) => err :: errs)

My question: How should I construct the divertTo function to send all exceptions to errorSink? Any suggestion on how to get the error object with information on which stage it failed would be helpful.

CodePudding user response:

I would recommend modelling your errors as a proper type so that you have a Flow[Either[CustomErrorType, Int]] for instance and then you can use divertTo with a predicate that looks at whether you have a Left or Right. Or maybe use recover in combination.

See this interesting article: https://bszwej.medium.com/akka-streams-error-handling-7ff9cc01bc12

CodePudding user response:

Future encodes both asynchronicity and can-fail. You'll need to separate the asynchronicity and can-fail.

Try, for instance, is an encoding of can-fail.

Meanwhile, mapAsyncUnordered only emits successes (you can use a supervision strategy to decide not to fail on a failed future, but that will drop the failures not emit them).

It seems that you want to accumulate a list of failures (given the use of Sink.fold). Since that list of failures is only accessible to the outside world through the materialized value, you'll want to use divertToMat instead of divertTo.

From this, the logical solution is:

import scala.concurrent.ExecutionContext
import scala.util.{ Failure, Success, Try }

// Returns a future which is only a failure on fatal exceptions
def liftToFutTry[T](fut: Future[T])(implicit ec: ExecutionContext): Future[Try[T]] =
  fut.map(Success(_))
    .recoverWith {
      case ex => Future.successful(Failure(ex))
    }

// for some reason we want a List[Exception] rather than List[Throwable]
val errorSink: Sink[Try[Int], Future[List[Exception]]] =
  Flow[Try[Int]]
    .mapConcat { t =>
      t.failed.get match {
        case ex: Exception => List(ex)
        case _ => Nil
      } : List[Exception]
    }
    .toMat(Sink.fold(List.empty[Exception]) { (exes, ex) => ex :: exes })(Keep.right)

// materializes as a future of the exceptions which failed in mapAsyncUnordered
val flow1: Flow[Int, Int, Future[List[Exception]]] =
  Flow[Int]
    .mapAsyncUnordered(2) { x =>
      val fut =
        if (x != 7) Future.successful(x)
        else Future.failed(new Exception(s"$x has failed (equaled 7)"))

      liftToFutTry(fut)
    }
    .divertToMat(errorSink, _.isFailure)(Keep.right)  // propagate the failures
    .map { successfulTry => successfulTry.get }

If you have two Flows like this and you want to compose them, you'd do

// materialized value is (list of failures from flow1, list of failures from otherFlow
val both: Flow[Int, Int, (Future[List[Exception]], Future[List[Exception]])]
  flow1
    .viaMat(flow2)(Keep.both)

// materialized value is:
//  (
//    (
//      list of failures from flow1,
//      list of failures from otherFlow
//    ),
//    list of ints which passed through both flow1 and otherFlow
//  )
both.toMat(sink)(Keep.both) : Sink[Int, ((Future[List[Exception]], Future[List[Exception]]), Future[List[Int]])]

There are other ways to encode can-fail: e.g. you could use Either from the standard library.

Accumulating a Future[List[_]] may be questionable; note that the Futures won't be complete until the stream finishes.

  • Related