Home > Software design >  Coordinating emission and subscription in Kotlin coroutines with hot flows
Coordinating emission and subscription in Kotlin coroutines with hot flows

Time:10-01

I am trying to design an observable task-like entity which would have the following properties:

  • Reports its current state changes reactively
  • Shares state and result events: new subscribers will also be notified if the change happens after they've subscribed
  • Has a lifecycle (backed by CoroutineScope)
  • Doesn't have suspend functions in the interface (because it has a lifecycle)

The very basic code is something like this:

class Worker {
  enum class State { Running, Idle }
  private val state = MutableStateFlow(State.Idle)
  private val results = MutableSharedFlow<String>()
  private val scope = CoroutineScope(Dispatchers.Default)

  private suspend fun doWork(): String {
    println("doing work")
    return "Result of the work"
  }

  fun start() {
    scope.launch {
      state.value = State.Running

      results.emit(doWork())

      state.value = State.Idle
    }
  }

  fun state(): Flow<State> = state

  fun results(): Flow<String> = results
}

The problems with this arise when I want to "start the work after I'm subscribed". There's no clear way to do that. The simplest thing doesn't work (understandably):

fun main() {
  runBlocking {
    val worker = Worker()
    // subscriber 1
    launch {
      worker.results().collect { println("received result $it") }
    }
    worker.start()
    // subscriber 2 can also be created "later" and watch
    // for state()/result() changes
  }
}

This prints only "doing work" and never prints a result. I understand why this happens (because collect and start are in separate coroutines, not synchronized in any way).

Adding a delay(300) to coroutine inside doWork "fixes" things, results are printed, but I'd like this to work without artificial delays.

Another "solution" is to create a SharedFlow from results() and use its onSubscription to call start(), but that didn't work either last time I've tried.

My questions are:

  • Can this be turned into something that works or is this design initially flawed?
  • If it is flawed, can I take some other approach which would still hit all the goals I have specified in the beginning of the post?

CodePudding user response:

Your problem is that your SharedFlow has no buffer set up, so it is emitting results to its (initially zero) current collectors and immediately forgetting them. The MutableSharedFlow() function has a replay parameter you can use to determine how many previous results it should store and replay to new collectors. You will need to decide what replay amount to use based on your use case for this class. For simply displaying latest results in a UI, a common choice is a replay of 1.

Depending on your use case, you may want to give your CoroutineScope a SupervisorJob() in its context so it isn't destroyed by any child job failing.

Side note, your state() and results() functions should be properties by Kotlin convention, since they do nothing but return references. Personally, I would also have them return read-only StateFlow/SharedFlow instead of just Flow to clarify that they are not cold.

  • Related