Home > Software design >  akka-stream pipeline backpressures desipite inserted buffer
akka-stream pipeline backpressures desipite inserted buffer

Time:07-26

I have an akka-stream pipeline that fans out events (via BroadcastHub) that are pushed into the stream via a SourceQueueWithComplete.

Despite all downstream consumers having a .buffer() inserted (of which I'd expect that it ensures that upstream buffers of hub and queue remain drained), I still observe backpressure kicking in after the system has been running for a while.

Here's a (simplified) snippet:

class NotificationHub[Event](
  implicit materializer: Materializer,
  ecForLogging: ExecutionContext
) {

  // a SourceQueue to enque events and a BroadcastHub to allow multiple subscribers
  private val (queue, broadCastSource) =
    Source.queue[Event](
      bufferSize = 64,
      // we expect the buffer to never run full and if it does, we want
      // to log that asap, so we use OverflowStrategy.backpressure
      OverflowStrategy.backpressure
    ).toMat(BroadcastHub.sink)(Keep.both).run()

  // This keeps the BroadCastHub drained while there are no subscribers
  // (see https://doc.akka.io/docs/akka/current/stream/stream-dynamic.html ):
  broadCastSource.to(Sink.ignore).run()

  def notificationSource(p: Event => Boolean): Source[Unit, NotUsed] = {
    broadCastSource
      .collect { case event if p(event) => () }
      // this buffer is intended to keep the upstream buffers of
      // queue and hub drained:
      .buffer(
        // if a downstream consumer ever becomes too slow to consume,
        // only the latest two notifications are relevant
        size = 2,
        // doesn't really matter whether we drop head or tail
        // as all elements are the same (), it's just important not
        // to backpressure in case of overflow:
        OverflowStrategy.dropHead
      )
  }

  def propagateEvent(
    event: Event
  ): Unit = {
    queue.offer(event).onComplete {
      case Failure(e) =>
        // unexpected backpressure occurred!
        println(e.getMessage)
        e.printStackTrace()
      case _ =>
        ()
    }
  }

}

Since the doc for buffer() says that for DropHead it never backpressures, I would have expected that the upstream buffers remain drained. Yet still I end up with calls to queue.offer() failing because of backpressure.

Reasons that I could think of:

  1. Evaluation of predicate p in .collect causes a lot of load and hence backpressure. This seems very unlikely because those are very simple non-blocking ops.
  2. Overall system is totally overloaded. Also rather unlikely.

I have a felling I am missing something? Do I maybe need to add an async boundary via .async before or after buffer() to fully decouple the "hub" from possible heavy load that may occur somewhere further downstream?

CodePudding user response:

So after more reading of akka docs and some experiments, I think I found the solution (sorry for maybe asking here too early).

To fully detach my code from any heavy load that may occur somewhere downstream, I need to ensure that any downstream code is not executed by the same actor as the .buffer() (e.g. by inserting .async).

For example, this code would eventually lead to the SourceQueue running full and then backpressuring:

val hub: NotifactionHub[Int] = // ...
hub.notificationSource(_ => true)
  .map { x =>
    Thread.sleep(250)
    x
  }

Further inspections showed that this .map() would be executed on the same thread (of the underlying actor) as the upstream .collect() (and .buffer()).

When inserting .async as shown below, the .buffer() would drop elements (as I had intended it to) and the upstream SourceQueue would remaind drained:

val hub: NotifactionHub[Int] = // ...
hub.notificationSource(_ => true)
  .async
  .map { x =>
    Thread.sleep(250)
    x
  }
  • Related