I have a source a
that emits values into a sink b
.
Now I want to have another source c
that emits a value, everytime b receives an event.
My idea was to use another sink d
that can be used as a notifier, but then I need the functionality to create a Source from a Sink.
a.to(b).alsoTo(d)
something like
Source.from(d)
CodePudding user response:
Another way of describing this is that you want every event emitted by a
to go to both b
and c
. This is what a BroadcastHub
does; it can be used to allow events from one Source
to be consumed by multiple Sinks
.
If you connect a Source
to a BroadcastHub.sink
and then materialise it, you get a new Source
. This Source
can then be attached to 2 or more Sink
s and each Sink
will get a copy of the message sent by the original Source
.
For example I use this with Akka to have a Actor that broadcasts messages to multiple clients (for gRPC events):
val (actorRef: ActorRef[Event], eventSource: Source[Event, akka.NotUsed]) =
ActorSource.actorRef[Event](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
16,
OverflowStrategy.fail
)
.toMat(BroadcastHub.sink)(Keep.both)
.run()
This creates eventSource
which can be used in a pipeline and materialised multiple times to create multiple streams. Each time a message is sent to the actorRef
, every stream that was materialised from eventSource
receives that message.
See the documentation for more details.