I'm exploring how to build a resource pool that suspends until a resource is available. This pool can be anything, from slots to allow API calls to socket connections to image processors or in this sample code, integers which could be available service counters:
val pool = Pool(1, 2, 3)
repeat(100) {
launch {
pool.borrow {
println(it)
delay(1000)
}
}
}
The borrow function grabs one item from the resource pool and returns it once it's done.
What I've done so far is this:
class Pool(vararg resource: Int) {
private val mutex = Mutex()
private val list = mutableListOf(*resource.toTypedArray())
private var available: CompletableDeferred<Boolean>? = null
suspend fun add(value: Int) {
mutex.withLock {
list.add(value)
available?.complete(true)
}
}
suspend fun rem(): Int {
mutex.withLock {
if (list.size == 1) {
available = CompletableDeferred()
} else {
available?.await()
}
return list.removeLast()
}
}
suspend fun borrow(handler: suspend (Int) -> Unit) {
val borrowed = rem()
try {
handler(borrowed)
} finally {
add(borrowed)
}
}
}
Both add
and rem
ove runs inside mutex.withLock { ... }
to ensure we don't get a concurrent modification exception when two threads are trying to modify the same list.
Initially, available is null, so any available?.await()
will be skipped due to the null check.
Once you remove the last item from the list (list.size == 1
), available is set to a CompletableDeferred
which means available?.await()
will now suspend until available?.complete(true)
is called.
Once more items are added, available?.complete(true)
is called again which will stop available?.await()
from suspending. If you try to remove an item, this code will deadlock, available?.await()
will suspend which means the mutex never exits preventing add
from being called again to allow the rem from unsuspending again.
If I move that available?.await()
before the mutex, two threads will try to remove an item from a list that might only have one item in it and the second thread will hit a List is empty
error.
What is the correct way to implement such a suspending resource pool?
CodePudding user response:
I think this would be a lot easier to build using a Channel. You don't even need to make add
suspending because you can use trySend
from any thread safely when the Channel has an unlimited capacity.
I also suggest making borrow
inline and removing suspend
from its function parameter. This avoids a function wrapper allocation when you use it. Since it's inline, you'll still be able to call suspend functions in the lambda you pass to it even if it's not a suspend lambda.
class Pool<T>(vararg initialResources: T) {
private val channel = Channel<T>(Channel.UNLIMITED)
init {
for (res in initialResources) {
channel.trySend(res)
}
}
fun add(value: T) {
channel.trySend(value)
}
suspend fun rem(): T = channel.receive()
suspend inline fun borrow(handler: (T) -> Unit) {
val borrowed = rem()
try {
handler(borrowed)
} finally {
add(borrowed)
}
}
}