Home > Back-end >  Checking message query by actor in between calculations in scala/akka
Checking message query by actor in between calculations in scala/akka

Time:11-06

I have an actor, that when he receives one message he starts to do come calculations in a loop, and he does them for some time (like 100 times he does the same). Now I need him to react to other mesages that may come ASAP. The best way would be to add some instruction in his loop like "if there is a message in queue react and then return here" but I haven't seen such functionality.

I thought that actor could send message to himself instead of doing a loop, then such messages would be queued at the end and he would react to other ones in between, but I've heard that communication is bad (much more time consuming than calculations) and don't know if communication with self counts as such.

My question is what do you think about such solution and do you have anyother ideas how to handle communication inbetween calculations?

CodePudding user response:

Time-consuming computation should not be done in the main receive method as it reduces responsiveness of the system. Put the computation in a blocking Future or Task or other asynchronous object, and send a message to the actor when the computation completes. The actor can continue to process messages ASAP while the computation continues on a different thread.

This gets more complicated if the actor needs to modify the computation while it is running (in response to messages) but the solution depends on what the computation is and what kind of modification is needed, so it isn't really possible to give a general answer.

CodePudding user response:

In general in Akka you want to limit the amount of work done "per unit" where a unit in this case:

  • an actor processing a message
  • work done in a Future/Task or a callback of the same

Overlong work units can easily limit the responsiveness of the entire system by consuming a thread. For tasks which aren't consuming CPU but are blocked waiting for I/O, those can be executed in a different thread pool, but for doing some CPU-consuming work, that doesn't really help.

So the broad approach, if you're doing a loop, is to suspend the loop's state into a message and send it to yourself. It introduces a small performance hit (the latency of constructing the message, sending it to yourself (a guaranteed-to-be local send), and destructuring it is likely going to be on the order of microseconds when the system is otherwise idle), but can improve overall system latency.

For example, imagine we have an actor which will calculate the nth fibonacci number. I'm implementing this using Akka Typed, but the broad principle applies in Classic:

object Fibonacci {
  sealed trait Command

  case class SumOfFirstN(n: Int, replyTo: ActorRef[Option[Long]]) extends Command

  private object Internal {
    case class Iterate(i: Int, a: Int, b: Int)  extends Command
    val initialIterate = Iterate(1, 0, 1)
  }

  case class State(waiting: SortedMap[Int, Set[ActorRef[Option[Long]]]]) {
    def behavior: Behavior[Command] =
      Behaviors.receive { (context, msg) =>
        msg match {
          case SumOfFirstN(n, replyTo) =>
            if (n < 1) {
              replyTo ! None
              Behaviors.same
            } else {
              if (waiting.isEmpty) {
                context.self ! Internal.initialIterate
              }

              val nextWaiting =
                waiting.updated(n, waiting.get(n).fold(Set(replyTo))(_.incl(replyTo))
              copy(waiting = nextWaiting).behavior
            }

          case Internal.Iterate(i, a, b) =>
            // the ith fibonacci number is b, the (i-1)th is a
            if (waiting.rangeFrom(i).isEmpty) {
              // Nobody waiting for this run to complete
              if (waiting.nonEmpty) {
                context.self ! Internal.initialIterate
              }

              Behaviors.same
            } else {
              var nextWaiting = waiting
              var nextA = a
              var nextB = b

              (1 to 10).foreach { x =>
                val next = nextA   nextB

                nextWaiting.get(x   i).foreach { waiters =>
                  waiters.foreach(_ ! Some(next))
                }

                nextWaiting = nextWaiting.removed(x   i)
                nextA = nextB
                nextB = next
              }

              context.self ! Internal.Iterate(i   10, nextA, nextB)
              copy(waiting = nextWaiting)
            }
        }
      }
  }
}

Note that multiple requests (if sufficiently temporally close) for the same number will only be computed once, and temporally close requests for intermediate results will result in no extra computation.

  • Related