I'm trying to replicate a Worker Pool in Kotlin: https://gobyexample.com/worker-pools
It works wonderful, but my problem is that I get OutOfMemoryError because all object references from the worker coroutines are kept in the heap as long as the coroutine is running. How can I avoid this problem?
Here is my code:
I create a channel in Service A and receive the data everytime a channel object is received.
class ServiceA(
) {
val channel = Channel<Pair<String,ByteArray>>(10000)
private val coroutineScope = CoroutineScope(SupervisorJob() Dispatchers.Default)
@PostConstruct
fun createWorkerGroup(){
coroutineScope.launch{
for(x in 1..5){
launch {
println("Create Worker $x")
while (true){
uploadImage(channel.receive() )
}
}
}
}
}
private suspend fun uploadImage(urlAndImage: Pair<String, ByteArray>){
val (url,image) = urlAndImage
println("Uploading Image: $url")
}
In my controller method I send my data to a channel:
uploadService.channel.send(Pair(url, image.bytes))
CodePudding user response:
The worker pool can be automatically handled by the coroutine scope with an appropriate dispatcher.
If the image upload does blocking operations, you might want to use the IO dispatcher, like so: CoroutineScope(Dispatchers.IO.limitedParallelism(5))
. I have omitted the SupervisorJob
because you do not need it for the parent coroutine that will be created in createWorkerGroup()
, but for the ones created by it. Don't forget to create the logic for cancelling the CoroutineScope
when it is no longer needed.
After that, you can launch coroutines at will with no performance overhead, in the same place you did before:
@PostConstruct
fun createWorkerGroup() {
coroutineScope.launch{
supervisorScope {
channel.consumeEach {
launch {
uploadImage(it)
}
}
}
}
}
This is the correct approach for creating and using the worker pool, but you will need to test it in order to see if it eliminates the OutOfMemoryError
. You might also want to try reducing the channel's capacity. Good luck!
CodePudding user response:
Thank you @Halex for the help, here is my Kotlin coroutine worker pool with proper garbage collection
private val coroutineScope = CoroutineScope(Dispatchers.Default)
@OptIn(ExperimentalCoroutinesApi::class)
private val superVisorScope = CoroutineScope(SupervisorJob() Dispatchers.IO.limitedParallelism(5))
@PostConstruct
fun createWorkerGroup() {
coroutineScope.launch {
superVisorScope.launch {
channel.consumeEach {
launch {
uploadImage(it)
}
}
}
}
coroutineScope.cancel()
}