I am struggling test and/or implement a method that listens to an infinite flow in the background. Specifically, the use case I have in mind is a repository for some data that has a local and remote data source. The local data source is the single source of truth. The repository exposes a flow of changes over time, which is really just the local data sources flow of changes over time. However, changes from the remote are also reflected in the local by collecting the remote source flow in the background.
I am trying to test the latter, but since the remote source flow is infinite, using runBlocking for the test makes it run forever. But I wouldn't be surprised if I am misunderstanding something else regarding coroutines and scope as well (rather, I would be surprised if I didn't). How would I go about making my test not wait forever? And am I making some fundamental error in the repository's observerValues
method that makes it not work as I think it is?
The following is my current confused attempt in an as small as possible non-working example:
package com.example.data.repositories
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flow
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.BeforeEach
import org.junit.jupiter.api.DisplayName
import org.junit.jupiter.api.Nested
import org.junit.jupiter.api.Test
import org.mockito.Mockito
import org.mockito.kotlin.argumentCaptor
import org.mockito.kotlin.mock
import org.mockito.kotlin.whenever
@DisplayName("FooRepository")
internal class FooRepositoryTest {
private val mockLocalDataSource: FooLocalDataSource = mock()
private val mockRemoteDataSource: FooRemoteDataSource = mock()
private var testRepository: FooRepository =
FooRepository(mockLocalDataSource, mockRemoteDataSource)
@BeforeEach
fun setUp() {
Mockito.reset(mockLocalDataSource)
Mockito.reset(mockRemoteDataSource)
testRepository = FooRepository(mockLocalDataSource, mockRemoteDataSource)
}
@Nested
@DisplayName("observeReferencePoints()")
inner class ObserveReferencePointsTest {
@ExperimentalCoroutinesApi
@Test
fun `acts on value emitted from remote flow`() = runBlocking {
val intCaptor = argumentCaptor<Int>()
val valuesActedOn = mutableListOf<Int>()
whenever(mockLocalDataSource.observeValues()).thenReturn(infiniteFlow(1, 60000, 0))
whenever(mockLocalDataSource.doSomething(intCaptor.capture())).then {
valuesActedOn.add(intCaptor.firstValue)
}
whenever(mockRemoteDataSource.observeValues()).thenReturn(infiniteFlow(7, 0, 60000))
val testFlow = testRepository.observeValues()
val collectedValues = mutableListOf<Int>()
val collectionJob = launch {
testFlow.collect { collectedValues.add(it) }
}
delay(100)
collectionJob.cancel()
assertEquals(listOf<Int>(), collectedValues)
assertEquals(listOf(7), valuesActedOn)
}
}
}
class FooRepository(
private val localDataSource: FooLocalDataSource,
private val remoteDataSource: FooRemoteDataSource,
) {
suspend fun observeValues(): Flow<Int> = coroutineScope {
val localFlow = localDataSource.observeValues()
launch {
remoteDataSource.observeValues().collect {
localDataSource.doSomething(it)
}
}
localFlow
}
}
class FooLocalDataSource {
private val values = mutableListOf<Int>()
fun observeValues(): Flow<Int> = infiniteFlow(5, 10000, 10000)
suspend fun doSomething(value: Int) {
delay(50)
values.add(value)
}
}
class FooRemoteDataSource {
fun observeValues(): Flow<Int> = infiniteFlow(3, 15000, 15000)
}
fun <T> infiniteFlow(item: T, delayBefore: Long, delayAfter: Long): Flow<T> = flow {
while (true) {
delay(delayBefore)
emit(item)
delay(delayAfter)
}
}
CodePudding user response:
suspend fun xxx(): Flow<T>
is a good indicator that something is flawed about the implementation. This is because Flow<T>
is designed as a cold stream, leaving specifics about the execution to the user.
One thing you need to be aware of is that coroutineScope
does not return until all the coroutines launched within it have finished.
Here this means the observeValues()
method does not return until remoteDataSource.observeValues().collect(...)
returns.
A correct implementation would look like:
fun observeValues(): Flow<Int> = flow {
coroutineScope { // allow parallel processing of remoteDataSource
val localFlow = localDataSource.observeValues()
launch {
remoteDataSource.observeValues().collect {
localDataSource.doSomething(it)
}
}
emitAll(localFlow) // Forward emitted data to outer flow
}
}