I'm using the following snippet of code in my Akka classic project.
(persistence ? Persist("a", Some(100), 123)) (100.milliseconds)
.mapTo[Persisted]
.recover(ex => Failure(ex.getMessage()))
.pipeTo(self)
When I handle the success response of this ask
then the sender is self
. How can I get the sender of the response to this ask
? In other words, how can I get the address of the target/ recipient actor here?
Edit:
It's quite obvious that persistence
is the target of the ask. But what I really want to know is if there's a way to have persistence
address be available through the pipeTo
. Let's say there is an array of persistence
than I wouldn't know which Persist
message came from which persistence
.
CodePudding user response:
The answer is in your question. persistence
is the target of the ask.
It's possible that persistence
is delegating work to another actor (e.g. a worker in a pool), but that would reveal implementation details and tbh is probably not going to be that useful (which is part of why the ask pattern doesn't propagate sender).
If it's a protocol you control, explicitly adding an ActorRef
to the response is guaranteed to work.
You could roll your own version of the ask pattern which propagates the sender, though as noted above, it's probably not going to be that useful.
EDIT: to propagate persistence
into the forwarded reply, the easiest is to map
the Future
result of the ask into something which bundles persistence
with the result (as a sort of correlation ID), like:
(persistence ? Persist("a", Some(100), 123)) (100.milliseconds)
.mapTo[Persisted]
.map { response => persistence -> response }
.recoverWith { ex => persistence -> Failure(ex.getMessage) }
.pipeTo(self)
The messages that get sent will be tuples of an ActorRef
and either Persisted
or Failure
, so you'd match them in your receive with something like
case (persistenceTarget, Persisted(...)) => ???
case (persistenceTarget, Failure(msg)) => ???
(You can also explicitly change the protocol into something that includes the ActorRef
: a tuple is perhaps a little too primitive, but is convenient for this answer without knowing more details about the protocol)
Note that if persistence
is a field in your Actor
, it might change between when you send the ask and when you execute map
/recoverWith
: the map
/recoverWith
will end up picking up the changed value. This inadvertent "closing over" actor state is a longstanding source of nasty bugs in Akka, so it might be worth having a
val persistenceTarget = persistence
and replacing mentions of persistence
in the ask/map
/recoverWith
with persistenceTarget
: since persistenceTarget
is a local (and immutable) value, it's safe to close over.