Home > Back-end >  Akka Source that emits when another Sink receives
Akka Source that emits when another Sink receives

Time:03-30

I have a source a that emits values into a sink b.

Now I want to have another source c that emits a value, everytime b receives an event.

My idea was to use another sink d that can be used as a notifier, but then I need the functionality to create a Source from a Sink.

a.to(b).alsoTo(d)

something like

Source.from(d)

CodePudding user response:

Another way of describing this is that you want every event emitted by a to go to both b and c. This is what a BroadcastHub does; it can be used to allow events from one Source to be consumed by multiple Sinks.

If you connect a Source to a BroadcastHub.sink and then materialise it, you get a new Source. This Source can then be attached to 2 or more Sinks and each Sink will get a copy of the message sent by the original Source.

For example I use this with Akka to have a Actor that broadcasts messages to multiple clients (for gRPC events):

val (actorRef: ActorRef[Event], eventSource: Source[Event, akka.NotUsed]) =
  ActorSource.actorRef[Event](
    completionMatcher = PartialFunction.empty,
    failureMatcher = PartialFunction.empty,
    16,
    OverflowStrategy.fail
  )
    .toMat(BroadcastHub.sink)(Keep.both)
    .run()

This creates eventSource which can be used in a pipeline and materialised multiple times to create multiple streams. Each time a message is sent to the actorRef, every stream that was materialised from eventSource receives that message.

See the documentation for more details.

  • Related