Home > OS >  How do I control which upstream hot flow a downstream flow collects from?
How do I control which upstream hot flow a downstream flow collects from?

Time:10-31

Imagine there are have two upstream hot flows and one downstream flow. I want to be able to collect from the downstream flow once, and for the downstream flow to collect from either of the two upstream flows. I want to be able to control using some logic which of the two upstream flows the downstream flow is collecting from, and I want to be able to switch back and forth arbitrarily until I stop collecting on the downstream flow.

For example, I call this once:

downstreamFlow.collect {
   // This is the endpoint of the flow
}

Now imagine a method called switch(). I would start collecting on upstream flow 1 like this:

downstreamFlow.switch(upstreamFlow1) // Now the downstreamFlow is emitting upstreamFlow1

I would switch to upstream flow 2 like this:

downstreamFlow.switch(upstreamFlow2) // Now the downstreamFlow is emitting upstreamFlow2

The top collect would collect from upstream 1, then upstream 2. I could switch back and forth arbitrarily. There could be an upstream 3, ...etc. What would switch() look like?

Looking for that or any other ideas that work.

CodePudding user response:

The way I interpret this is that you want a way to encapsulate this switching behavior such that collectors of the downstream flow don't need to know about the switching behavior or anything about the upstream flows.

The following is an idea I have about how it might be implemented. You end up with a Flow that is sort of hot, in the sense that all its downstream collectors will be switched to get a new upstream when switch() is called. But if you use it with cold upstreams (not what you asked for, but it isn't restrictive the way I wrote it), it's sort of cold in the sense that each downstream collector triggers its own upstream collection. You could of course change the signature of switch to prohibit cold upstreams, or you could change the switcherFlow function to return a hot wrapper around the implementation.

interface SwitcherFlow<T> : Flow<T> {
    fun switch(upstream: Flow<T>)
}

@OptIn(ExperimentalCoroutinesApi::class)
class SwitcherFlowImpl<T> private constructor(
    private val flowOfSources: MutableStateFlow<Flow<T>>
) : Flow<T> by flowOfSources.flatMapLatest({ it }), SwitcherFlow<T> {
    constructor(initialFlow: Flow<T>) : this(MutableStateFlow(initialFlow))

    override fun switch(upstream: Flow<T>) {
        flowOfSources.value = upstream
    }
}

fun <T> switcherFlow(initialFlow: Flow<T> = emptyFlow()): SwitcherFlow<T> = 
   SwitcherFlowImpl(initialFlow)
  • Related