Home > Blockchain >  Comprehension of Actor with ExecutionContext
Comprehension of Actor with ExecutionContext

Time:11-28

As I understand Akka parallelism, to handle each incoming message Actor use one thread. And this thread contains one state. As is it so, sequential messages does't share this states.

But Actor may have an ExecutorContext for execute callbacks from Future. And this is the point, where I stop understanding parallelism clearly.

For example we have the following actor:

  class AnyActor(target: ActorRef) extends Actor { 
          implicit val ec: ExecutionContext = context.dispatcher

          def receive = {
               case messageA =>
                    val api = createApi()
                    val furureA: Future[F] = api.callA 
                        api.close()
                    futureA.pipeTo(sender()) 
               case messageB =>
                    val api = createApi()
                    val furureB: Future[F] = api.callB 
                        api.close()
                    futureB.pipeTo(sender()) 
             }
  }

Suppose, Actor receive messageA, and Thread1 create instance of api - let 's call "api1". Also there is executionContext with N of threads. One of this threads is used for retrieve result from furureA.

What I don't understand, is how this N threads correlate with Thread1. ExecutionContext is created only for Thread1? Or it is also created for Thread2 (in which messageB handled)?

CodePudding user response:

Broadly, actors run on an dispatcher which selects a thread from a pool and runs that actor's Receive for some number of messages from the mailbox. There is no guarantee in general that an actor will run on a given thread (ignoring vacuous examples like a pool with a single thread, or a dispatcher which always runs a given actor in a specific thread).

That dispatcher is also a Scala ExecutionContext which allows arbitrary tasks to be scheduled for execution on its thread pool; such tasks include Future callbacks.

So in your actor, what happens when a messageA is received?

  • The actor calls createApi() and saves it
  • It calls the callA method on api
  • It closes api
  • It arranges to forward the result of callA when it's available to the sender
  • It is now ready to process another message and may or may not actually process another message

What this actually means depends on what callA does. If callA schedules a task on the execution context, it will return the future as soon as the task is scheduled and the callbacks have been arranged; there is no guarantee that the task or callbacks have been executed when the future is returned. As soon as the future is returned, your actor closes api (so this might happen at any point in the task's or callbacks' execution).

In short, depending on how api is implemented (and you might not have control over how it's implemented) and on the implementation details, the following ordering is possible

  • Thread1 (processing messageA) sets up tasks in the dispatcher
  • Thread1 closes api and arranges for the result to be piped
  • Thread2 starts executing task
  • Thread1 moves on to processing some other message
  • Thread2's task fails because api has been closed

In short, when mixing Futures and actors, the "single-threaded illusion" in Akka can be broken: it becomes possible for arbitrarily many threads to manipulate the actor's state.

In this example, because the only shared state between Futureland and actorland is local to the processing of a single message, it's not that bad: the general rule in force here is:

  • As soon as you hand mutable (e.g. closeable) state from an actor to a future (this includes, unless you can be absolutely sure what's happening, calling a method on that stateful object which returns a future), it's best for the actor to forget about the existence of that object

How then to close api?

Well, assuming that callA isn't doing anything funky with api (like saving the instance in some pool of instances), after messageA is done processing and the future is completed, nothing has access to api. So the simplest, and likely most correct, thing to do is arrange for api to be closed after the future has completed, along these lines

val api = createApi()
val futureA: Future[F] = api.callA

futureA.foreach { _ => api.close() }
futureA.pipeTo(sender()) 
  • Related