I can launch my stateFlow collection as below
val collectingScope = CoroutineScope(Dispatchers.Default)
val stateFlow = MutableStateFlow(0)
val myJob = collectingScope.launch {
stateFlow.collect {
println("collected $it")
}
}
And I can cancel it using
myJob.cancel()
But I wonder if I can also cancel it through the stateFlow
instead?
I see there's a cancel()
function but it's deprecated
@Deprecated(
message = "cancel() is resolved into the extension of outer CoroutineScope which is likely to be an error."
"Use currentCoroutineContext().cancel() instead or specify the receiver of cancel() explicitly",
level = DeprecationLevel.ERROR,
replaceWith = ReplaceWith("currentCoroutineContext().cancel(cause)")
)
public fun FlowCollector<*>.cancel(cause: CancellationException? = null): Unit = noImpl()
If I can do so, does the cancelation also auto-cancel myJob
?
CodePudding user response:
No. StateFlows cannot be cancelled. See the documentation:
State flow never completes. A call to Flow.collect on a state flow never completes normally, and neither does a coroutine started by the Flow.launchIn function. An active collector of a state flow is called a subscriber.
CodePudding user response:
Kotlin Flows in Android summary
Just like my previous article about coroutines, this is an attempt to group most of the information you’ll need to use Kotlin flows in Android and provide you with links you can use to get more in-depth details on any of the covered topics.
Flow is the Kotlin type that can be used to model streams of data. Just like LiveData and RxJava streams, Flow lets you implement the observer pattern: a software design pattern that consists of an object (a.k.a. observable, producer, source, emitter) that maintains a list of its dependents, called observers (subscribers, collectors, receivers, etc.), and notifies them automatically of any state changes. This terminology is used interchangeably throughout this article depending on the context, but those terms always refer to the same two entities.
Observables can be either hot or cold. Hot observables send or emit data even if no one is watching them and cold observables emit data only if they have an active observer. By default, Kotlin flows are cold.
Creating flows
The standard library provides you with several ways to create a flow, the easiest way is to use the flow operator:
fun numbersFlow(): Flow<Int> = flow {
// code inside flow can suspend!
for (i in 1..3) {
delay(100)
emit(i)
}
} // the function numbersFlow() itself does not suspend
There are other ways to create a Flow, including the flowOf() function, and the extension asFlow(), which can be used for collections, sequences, ranges, values or functional types:
flowOf(1,2,3)
listOf(“A”,“B”,“C”).asFlow()
(1..3).asFlow()
Collecting flows
Flows are cold, which means that the code inside a flow builder does not execute until a terminal operator is applied to the flow. The most common terminal operator is collect:
someCoroutineScope.launch {
numbersFlow.collect { value -> print(value) }
}
Some other terminal operators include:
toCollection(destination: C) collects the flow into the provided MutableCollection. This operator has two convenient wrappers: toList and toSet;
first() and last() return the respective element or NoSuchElementException. There are also variants that take a predicate as an argument or return null instead of throwing an exception;
single() awaits for just one value to be emitted and can throw NoSuchElementException like first() or IllegalStateException if the flow contains more than one element. A singleOrNull() variation also exists;
reduce() and fold() can be used to reduce a flow to a single value.
An important thing to keep in mind is that all terminal operators are suspending functions and thus must be called from a coroutine. Also, all these operators will suspend the coroutine until the flow is fully collected.
To prevent the flow collection from suspending the current scope, you can use the launchIn operator:
someCoroutineScope.launch {
val job = numbersFlow
.onEach { value -> println(value) }
.launchIn(scope = this)
}
The launchIn operator returns a Job that can be used to cancel() the flow collection without canceling the whole coroutine scope. If needed, you can use join() to wait for the job to complete.
Keep in mind that the operators first, last and toList can be very helpful when writing unit-tests for classes that expose flows.
Collecting flows in Android
In Android, usually you’ll end up collecting a Flow in the presentation layer of your app, i.e. a Fragment or an Activity. Obviously, when the UI is not on the screen and we don’t need to display the information we’re getting, so we should stop flow collection. There are several lifecycle-aware alternatives you could use to handle this automatically:
Flow<T>.asLiveData(): LiveData
Lifecycle.repeatOnLifeCycle(state)
Flow<T>.flowWithLifecycle(lifecycle, state)
Using asLiveData might not be the best solution, because it involves combining Flows with another library that’s actually not needed, but is very helpful in the process of migrating from LiveData to Flow.
repeatOnLifeCycle is a suspending function and you need to specify a Lifecycle.State as its input. A new coroutine will be launched each time the lifecycle reaches that state and will automatically be canceled once the lifecycle goes below the specified state:
class MyActivity: AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
// ...
lifecycleScope.launch {
repeatOnLifecycle(Lifecycle.State.STARTED) {
viewModel.someFlow().collect {
// flow is collected during the time
// we’re in the STARTED state.
}
}
// resumes only when lifecycle is DESTROYED!!!
}
}
}
Note that the repeatOnLifecycle suspends the coroutine until the lifecycle reaches the DESTROYED state. To collect several flows, you can launch several coroutines:
repeatOnLifecycle(Lifecycle.State.STARTED) {
launch { viewModel.someFlow().collect { /* ... */ } }
launch { viewModel.someOtherFlow().collect { /* ... */ } }
}
If you only need to collect a single flow, flowWithLifecycle can be used:
lifecycleScope.launch {
viewModel.someFlow()
.flowWithLifecycle(lifecycle, State.STARTED)
.collect { /* ... */ }
}
The APIs repeatOnLifeCycle and flowWithLifecycle are the currently recommended approaches and are available in the androidx.lifecycle:lifecycle-runtime-ktx library (version 2.4.0 and above).
Intermediate operators
The behaviour of a flow can be modified by applying different operators to it, just like with collections and sequences. Operators are not suspending functions: they are immediately applied to an upstream flow (before/above the operator) and return a new, transformed flow, but are executed lazily as each item is emitted into the flow.
I can’t think of a better way of getting familiar with different operators than playing round with https://flowmarbles.com (flow’s version of https://rxmarbles.com). It’s an awesome resource that provides interactive diagrams of the operators you can use on flows. Note that you can move the marbles around and see how they affect the resulting flow.
Context
Unless special intermediate operators that use multiple flows are used, all emitted values are processed by the operators sequentially from upstream to downstream and then delivered to the terminal operator.
By default, code in builders, intermediate operators and the collection itself is performed in the coroutine context that invoked the terminal operator. This property of flows is called context preservation.
This might be ok for fast-running, non-blocking code, but for some long term operations you might want to execute the code in a different Dispatcher, like Default or IO.
In suspending functions you could use withContext() to change the Dispatcher, but code inside builders is not allowed to emit from a different context (thus breaking context preservation) and attempting to use withContext() will throw a runtime Exception.
To change the context, you can use the flowOn() operator:
fun numbersFlow() = flow { // runs in Dispatchers.Default
for (i in 1..3) {
Thread.sleep(100) // some long-running operation
emit(i)
}
}
.onEach { /* still in Dispatchers.Default */ }
.flowOn(Dispatchers.Default) // changing dispatcher
lifecycleScope.launch(Dispatchers.Main) {
numbersFlow()
.onEach { /* in Dispatchers.Main */ }
.collect { value -> /* Dispatchers.Main */ }
}
Collection and emission is now happening concurrently in different coroutines. Remember that the flowOn operator only changes the coroutine context in the builder and operators applied upstream of it.
Buffering
As stated above — flows execute code in the building block, operator and terminal operator sequentially:
flowOf("A", "B", "C")
.onEach { print("1$it ") }
.collect { print("2$it ") }
// prints "1A 2A 1B 2B 1C 2C"
As a consequence — the total execution time is going to be the sum of the execution times of all the operators. To prevent waiting for all upstream operators to complete on each emitted value instead of changing the coroutine context with the flowOn(), you can apply the buffer() operator:
flowOf("A", "B", "C")
.onEach { print("1$it ") }
.buffer()
.collect {
delay(100)
print("2$it ")
}
// prints "1A 1B 1C 2A 2B 2C "
The buffer() operator creates a separate coroutine during execution for the flow it is applied to. This coroutine will be used to execute the code in the producer and upstream operators.
If the consumer is slower than the producer , the buffer will become full at some point and, by default, will suspend the producer coroutine until the consumer coroutine catches up. This default strategy (BufferOverflow.SUSPEND) can be overridden by passing a different strategy to the operator, for example DROP_OLDEST or DROP_LATEST.
When a flow represents partial results of the operation or operation status updates, it may not be necessary to process each value, but instead, only the most recent ones. In this case, the conflate operator can be used to skip intermediate values when a collector is too slow to process them. Note that this function is just a shortcut for a buffer with a capacity of 0 and a DROP_OLDEST overflow strategy.
Another way to speed up processing with a slow collector is to cancel and restart it every time a new value arrives. You can use operators like collectLatest() for this behaviour.
someFlow()
.collectLatest { value ->
println("Collecting $value")
delay(300) // pretend we are processing it for 300 ms
println("Done $value")
}
Important note: remember that cancellation in coroutines is cooperative. The code in the collectLatest() block above is able to cancel because delay() is cancellable.
Exceptions
Flow collection can complete with an exception if it’s thrown inside the building block or any of the operators. Exceptions can be handled in both imperative and declarative ways:
fun numbers() = flow {
repeat(10) { emit(it) }
}
try {
numbers().collect {
check(it <= 1) { "Crashed on $it" }
print("$it;")
}
} catch (e:Exception) {
print("Exception caught(${e.message})!")
}
// prints "0;1;Exception caught(Crashed on 2)!"
Flows must be transparent to exceptions and it is a violation of the exception transparency principle to emit values in a flow builder from inside of a try/catch block. This guarantees that a collector throwing an exception can always catch it using try/catch as in the previous example.
If we define the numbers flow as:
fun doNotDoThis() = flow {
repeat(10) {
try {
emit(it)
} catch(e: Exception) {
print("this violates exception transparency!")
}
}
}
An exception thrown in the collector would be caught in the new try/catch block! Furthermore the flow would just continue emitting the 10 values and catching exceptions on every new value.
The correct way to encapsulate exception handling in an emitter is to use the catch operator:
fun strings() = listOf("A","B","C").asFlow()
.catch { cause: Throwable ->
// Inside the catch block you can:
// - ignore, log or process the exception
// - emit a new value using emit
// - re-throw the same exception or a new one using throw
}
The catch operator catches only upstream exceptions.
If we want to handle exceptions in a declarative way, we could move the body of the collect operator into an onEach, add a catch operator after it and collect the flow using the terminal operator collect() without input parameters:
numbers()
.onEach {
check(it <= 1) { "Crashed on $it" }
print("$it;")
}
.catch { print("Exception caught(${it.message})!") }
.collect()
// like the first example, this also prints:
// "0;1;Exception caught(Crashed on 2)!"
Completion
Sometimes, when a flow completes (either normally or by throwing an exception), you might need to perform some action. This can be done in two ways:
fun numbers() = (1..3).asFlow().map { "it;" }
try {
numbers().collect { value -> print(value) }
} finally {
print("Done")
} // prints "1;2;3;Done"
For a declarative approach, you can apply the intermediate operator onCompletion before collecting the flow:
numbers()
.onCompletion { cause: Throwable? ->
print("Done")
}
.collect { value -> print(value) }
// also prints "1;2;3;Done"
The advantage here is that the nullable parameter can be used to determine if the collection of the flow ended normally or with an exception.
An important thing to note is that, unlike catch, onCompletion does not handle the exception itself and it will still flow downstream. You must still use the operator catch or a try/catch block to handle the exception.
Another difference with the catch operator is that onCompletion sees all exceptions and receives a null exception only on successful completion of the upstream flow (without cancellation or failure): it’ll still receive the exception even if it’s thrown downstream, i.e. in the collector.
Cancellation
Flows adhere to the cooperative cancellation of coroutines (you can read more about coroutines and cooperative cancellation in my previous article). A flow collection can be canceled when the flow is suspended in a cancellable function (like delay).
The flow builder performs additional checks with ensureActive before emitting each new value. That said, other builders (like asFlow) don’t perform cancellation checks for optimization reasons and you’ll have to check for cancellation explicitly. For this, you can use the cancelable() operator or add the following code to any flow before it’s collected:
.onEach { currentCoroutineContext().ensureActive() }
Note that all implementations of SharedFlow and StateFlow are cancelable by default too.
Integration with other libraries
Kotlin flows are supported in multiple libraries, including Retrofit, WorkManager, DataStore and Room.
For example, starting with version 2.2 of Room, you can specify a flow as the return type of functions in a DAO and be notified of changes in the database:
@Dao
abstract class ExampleDao {
@Query("SELECT * FROM Example")
abstract fun getExamples(): Flow<List<Example>>
}
Ready-to-use converters for integration of flows with reactive streams such as RxJava are available in the corresponding modules of the coroutines library. Check the official docs for more info.
For LiveData interoperability, you can use the extension asLiveData() to convert a Flow to LiveData (available in the library androidx.lifecycle:lifecycle-livedata-ktx).
Just like with coroutines, where you can use suspendCancellableCoroutine to build a bridge between coroutines and a callback based API that returns a single value, you can use the callbackFlow builder to create a flow from any multi-shot callback API. A great explanation about both of them is available here.
StateFlow and SharedFlow
As mentioned before, Flows are cold by default. This means that if we subscribe to a Flow, the code in its builder will get executed each time we subscribe to it. This is something that you might not want to do when an Activity or Fragment goes through a configuration change. To solve this, StateFlow can be used.
StateFlow is a hot observable that stores the latest value. When created, it requires you to pass an initial state to it and any observer can check its current value or subscribe to it at any time to receive updates with the current value as it changes.
data class SomeState(
val isLoading: Boolean = false,
val data: String? = null,
)
val myState = MutableStateFlow<SomeState>(SomeState())
fun onl oadClicked() {
myState.update { currentState ->
currentState.copy(isLoading = true)
}
viewModelScope.launch {
val newData = repository.loadData()
myState.update { currentState ->
currentState.copy(isLoading = true)
}
}
}
Note that in the above example we don’t update the state by changing its value directly (which can lead to bugs if the state is being concurrently updated, especially when SomeState is a data class. A more detailed explanation about this possible bug can be found here), but rather use the function update to change the value atomically.
You can also convert any flow to a StateFlow using the stateIn operator:
val stateFlow: StateFlow<SomeState> = someFlow
.stateIn(
scope = viewModelScope,
started = SharingStarted.WhileSubscribed(5000),
initialValue = someInitialValue,
)
The started parameter is used to specify the strategy that controls when sharing is started and stopped. WhileSubscribed allows you to configure a delay in milliseconds between the time the last subscriber disappears and the time we stop the upstream flows. This can be used for cases when the app goes to the background to save battery and other resources, and still avoid restarting the upstream flow during configuration changes.
You might have noticed that StateFlow is very similar to LiveData, yet there are two big differences you should keep in mind:
StateFlow requires a an initial value
The method observe() from LiveData automatically unregisters the consumer when the view enters the STOPPED state. Flow collection is not stopped automatically, but this behaviour can be easily achieved with the repeatOnLifecycle extension.
Another hot observable is SharedFlow, a highly-configurable generalization of StateFlow. It can be tuned, to specify the number of old events to keep and replay for new subscribers, an extraBufferCapacity parameter to provide support for backpressure (fast emitters and slow subscribers) and it can define a BufferOverflow strategy to either suspend the emitter or drop the oldest/newest emitted value. In its default form, it can be used to emit events that need to be handled once:
MutableSharedFlow(
replay = 0,
extraBufferCapacity = 0,
onBufferOverflow = BufferOverfow.SUSPEND
)
An important thing to keep in mind about SharedFlow, is that in the absence of a subscriber (for example, if the event is posted during a configuration change) events will be immediately dropped and won’t reach the new subscriber, so it should be used for events that must be processed immediately or not at all.
For cases, such as processing navigation events, or showing a SnackBar, you might want to make sure that the event is retained until a subscriber appears. As suggested by Roman Elizarov in this article, this can be done by using a Channel: