Home > OS >  Akka - Why are stashed messages with backoff supervision lost?
Akka - Why are stashed messages with backoff supervision lost?

Time:01-11

Question

I seem to be observing a scenario in which the messages stashed for a typed supervised actor are lost during a restart, using the Akka backoff supervision strategy.

Is this expected behavior? If not, how can I implement ensure these stashed messages are retained?

The Setup

I create a typed supervised actor with a stash

    BackoffSupervisorStrategy backoff = SupervisorStrategy
        .restartWithBackoff(Duration.ofMillis(10), Duration.ofMillis(10000), 0)
        .withStashCapacity(2000);
    return Behaviors.supervise(Behaviors.setup(MyActor::new)).onFailure(Throwable.class, backoff);

It handles a command ForceFail which results in a RuntimeException so that we can let the Akka supervisor do its thing.

  private Behavior<Command> forceFail(ForceFail command) {
    getContext().getLog().info("Got fail command: {}", command.name);
    throw new RuntimeException(command.name);
  }

After spawning the actor, I send a series of tells

testSystem.tell(new ForceFail("first fail"));
testSystem.tell(new ForceFail("second fail"));
testSystem.tell(new ForceFail("third fail"));

Each tell results in an exception in the actor, triggering a restart by the supervisor. I check the size of the StashBuffer right before the supervisor unstashes the messages during a restart.

What I see is that during the first restart, the StashBuffer shows a size of 2, as expected. However, during the second restart for the second message, the size is 0, where I would expect it to be 1.

stash size

I do not see the last message get sent to the dead letter actor. It seems to be lost, with no logging describing what happens to it.

Notes

I see in the Akka internal code, the StashBuffer unstashAll() method is called. As written in the javadocs:

If an exception is thrown by processing a message a proceeding messages and the message causing the exception have been removed from the StashBuffer, but unprocessed messages remain.

The wording seems a bit funny, but what it's saying is that it will sequentially process the messages in the stash until it processes all of them or we hit an exception. Unhandled messages remain in the stash. This does not seem to be what I'm observing though.

I'm using Akka 2.7.0.

CodePudding user response:

You can embed the Actor inside a Router, so that the mailbox life-cycle is detached from the actor lifecycle, and the messages are not lost when the actor gets re-started.

object PrintActor {

  sealed trait Message

  final case class PrintMessage(value: String) extends Message
  final case class FailMessage(value: String) extends Message

  def apply(): Behavior[Message] =
    Behaviors.receive { (context, message) =>
      message match {
        case PrintMessage(value) =>
          println(s"Print Message :: ${value}")
          Behaviors.same
        case FailMessage(value) =>
          println(s"Fail Message :: ${value}")
          throw new RuntimeException(value)
      }

    }
}
object Main {

  def createActors(): Behavior[Unit] =
    Behaviors.setup[Unit] { ctx =>
      val pool = Routers.pool(poolSize = 1) {
        // make sure the workers are restarted if they fail
        Behaviors.supervise(PrintActor()).onFailure[Exception](SupervisorStrategy.restart)
      }

      val router = ctx.spawn(pool, "printer-pool")

      (1 to 10).foreach { n =>
        if (n % 2 == 1)
          router ! PrintActor.PrintMessage(s"Print $n")
        else
          router ! PrintActor.FailMessage(s"Fail $n")
      }

      Behaviors.empty
    }

  def main(args: Array[String]): Unit = {
    val system = ActorSystem.apply[Unit](createActors(), "system")
  }
}

This pattern will ensure that no message are lost.

Print Message :: Print 1
Fail Message :: Fail 2
Print Message :: Print 3
Fail Message :: Fail 4
Print Message :: Print 5
Fail Message :: Fail 6
Print Message :: Print 7
Fail Message :: Fail 8
Print Message :: Print 9
Fail Message :: Fail 10

CodePudding user response:

Not 100% sure but I think there may be a bug in supervision for failures happening while unstashing the internal stash of backoff.

I've created an issue in the Akka tracker for further investigations: https://github.com/akka/akka/issues/31814

  • Related