I have a generic question on Akka Streams,
I need to change stream behavior based on variable outside of akka. The variable is static and is changed by other piece of code.
How would you achieve this. Simply by checking variable in stream element ? For example:
.filterNot(ping -> pingRecieved)
The pingRecieved is static variable in Java class.
CodePudding user response:
It is legal to have a stream stage check some global state and alter its behavior based on that state.
Whether it's a great idea is another question entirely.
At minimum, you'll want to be aware of the limits and subtleties of the Java Memory Model around visibility (because if the code writing to that variable isn't executing on the same thread as the stream stage (and if it's outside of Akka, it categorically won't; if it's code executed by an actor on the same dispatcher as the stream stage, it might at some point execute on the same thread, but controlling that is going to require some tradeoffs), there's no guarantee about when (or even possibly whether) the stream stage will see the write). Ensuring that visibility (e.g. volatile
or using atomics) may in turn have substantial implications for performance, etc.
You may want to investigate alternatives like a custom stream stage which materializes as an object with methods which propagate updates to that value to the stage (e.g. via the async callback mechanisms in Akka): these would be guaranteed to become visible to the stage and would abstract away the concurrency. Another option would be to expose a source (e.g. a Source.queue
) which injects changes to that value as stream elements which get merged into the stream and interpreted by the stream to change its behavior. Alternatively, in some cases it might be useful to use mapAsync
or ask
to pass stream elements to an actor.