Question
I seem to be observing a scenario in which the messages stashed for a typed supervised actor are lost during a restart, using the Akka backoff supervision strategy.
Is this expected behavior? If not, how can I implement ensure these stashed messages are retained?
The Setup
I create a typed supervised actor with a stash
BackoffSupervisorStrategy backoff = SupervisorStrategy
.restartWithBackoff(Duration.ofMillis(10), Duration.ofMillis(10000), 0)
.withStashCapacity(2000);
return Behaviors.supervise(Behaviors.setup(MyActor::new)).onFailure(Throwable.class, backoff);
It handles a command ForceFail
which results in a RuntimeException
so that we can let the Akka supervisor do its thing.
private Behavior<Command> forceFail(ForceFail command) {
getContext().getLog().info("Got fail command: {}", command.name);
throw new RuntimeException(command.name);
}
After spawning the actor, I send a series of tells
testSystem.tell(new ForceFail("first fail"));
testSystem.tell(new ForceFail("second fail"));
testSystem.tell(new ForceFail("third fail"));
Each tell
results in an exception in the actor, triggering a restart by the supervisor.
I check the size of the StashBuffer
right before the supervisor unstashes the messages during a restart.
What I see is that during the first restart, the StashBuffer
shows a size of 2, as expected. However, during the second restart for the second message, the size is 0, where I would expect it to be 1.
I do not see the last message get sent to the dead letter actor. It seems to be lost, with no logging describing what happens to it.
Notes
I see in the Akka internal code, the StashBuffer unstashAll()
method is called. As written in the javadocs:
If an exception is thrown by processing a message a proceeding messages and the message causing the exception have been removed from the StashBuffer, but unprocessed messages remain.
The wording seems a bit funny, but what it's saying is that it will sequentially process the messages in the stash until it processes all of them or we hit an exception. Unhandled messages remain in the stash. This does not seem to be what I'm observing though.
I'm using Akka 2.7.0.
CodePudding user response:
You can embed the Actor inside a Router, so that the mailbox life-cycle is detached from the actor lifecycle, and the messages are not lost when the actor gets re-started.
object PrintActor {
sealed trait Message
final case class PrintMessage(value: String) extends Message
final case class FailMessage(value: String) extends Message
def apply(): Behavior[Message] =
Behaviors.receive { (context, message) =>
message match {
case PrintMessage(value) =>
println(s"Print Message :: ${value}")
Behaviors.same
case FailMessage(value) =>
println(s"Fail Message :: ${value}")
throw new RuntimeException(value)
}
}
}
object Main {
def createActors(): Behavior[Unit] =
Behaviors.setup[Unit] { ctx =>
val pool = Routers.pool(poolSize = 1) {
// make sure the workers are restarted if they fail
Behaviors.supervise(PrintActor()).onFailure[Exception](SupervisorStrategy.restart)
}
val router = ctx.spawn(pool, "printer-pool")
(1 to 10).foreach { n =>
if (n % 2 == 1)
router ! PrintActor.PrintMessage(s"Print $n")
else
router ! PrintActor.FailMessage(s"Fail $n")
}
Behaviors.empty
}
def main(args: Array[String]): Unit = {
val system = ActorSystem.apply[Unit](createActors(), "system")
}
}
This pattern will ensure that no message are lost.
Print Message :: Print 1
Fail Message :: Fail 2
Print Message :: Print 3
Fail Message :: Fail 4
Print Message :: Print 5
Fail Message :: Fail 6
Print Message :: Print 7
Fail Message :: Fail 8
Print Message :: Print 9
Fail Message :: Fail 10
CodePudding user response:
Not 100% sure but I think there may be a bug in supervision for failures happening while unstashing the internal stash of backoff.
I've created an issue in the Akka tracker for further investigations: https://github.com/akka/akka/issues/31814