Home > Enterprise >  Cancelling the collect of the Kotlin flow
Cancelling the collect of the Kotlin flow

Time:11-12

I have a parent class that has different states, this parent class has a list of child classes that have different states each. I want to collect on each one of them and cancel the one that reaches the Terminated state. Something like that:

coroutineScope.launch(Dispatcher.IO) {
   parent.parentState.collect {
      if(it is ParentState.Normal){
         it.children.forEach{ child ->
             coroutineScope.launch(Dispatcher.IO){
                child.childState.collect{
                    if(it is ChildState.Terminated){
                       //when this line executed all the collectors stop until I change the states for each one of them..
                       this.coroutineContext.job.cancel()
                    } else{
                       // Do something else for any other state...
                    }
                }
             }

         }
      }
   }
}

But when I do that all the children that I am collecting from stop collecting, but it starts collecting again If I changed the state for each one of them, which is wasn't the case before cancelling one of them.

So my question is why it behaves like that when cancelling the job for one of the collectors?

Also is there a better way "reactive way" to write this?

CodePudding user response:

By default, coroutine scope is using Job() in CoroutineContext, Job() will cancel coroutine execution or any running child on canceled or exception thrown.

To keep other child execution remain active, you can use special Job, which is SupervisorJob()

CoroutineScope(SupervisorJob()   Dispatchers.IO)

Therefore, your code will looks like below

val scope = CoroutineScope(SupervisorJob()   Dispatchers.IO)
scope.launch {
   parent.parentState.collect {
      if(it is ParentState.Normal){
         it.children.forEach{ child ->
             coroutineScope.launch(Dispatcher.IO){
                child.childState.collect{
                    if(it is ChildState.Terminated){
                       //when this line executed all the collectors stop until I change the states for each one of them..
                       this.coroutineContext.job.cancel()
                    } else{
                       // Do something else for any other state...
                    }
                }
             }

         }
      }
   }
}

CodePudding user response:

I totally agree that you can use SupervisorJob to deal with your problems.

But in my opinion, not so many sub-jobs are needed. A sub-job can solve the problems you encounter. In your code, a child coroutine of the size of children Collection will be created. Although the coroutine is very lightweight, I think it is unnecessary overhead.

You can completely convert List<Flow<T>> into Flow<List<T>> before each Flow collect. After that, only the converted single Flow can be collect.

Here is how I handled it:

inline fun <reified T> List<Flow<T>>.flattenFlow(): Flow<List<T>> = combine(this@flattenFlow) {
    it.toList()
}

coroutineScope.launch(Dispatcher.IO) {
    parent.parentState.collect {
        if (it is ParentState.Normal) {
            it.flattenFlow().collect {childStateList ->
                childStateList.onEach {childState ->
                    if (childState is ChildState.Terminated) {
                        // Do something when state in Terminated..
                    } else {
                        // Do something else for any other state...
                    }
                }
            }
        }
    }
}

  • Related