I have a large sequence that is lazily generated, and is far too big to fit in memory.
I would like to process this sequence using coroutines to improve performance, in this example I am using 10 parallel threads for processing.
runBlocking(Dispatchers.IO.limitedParallelism(10)) {
massiveLazySequenceOfThings.forEach { myThingToProcess ->
print("I am processing $myThingToProcess")
launch() {
process(myThingToProcess)
}
print("I am done processing $myThingToProcess")
}
}
The problem here is that the first print statement will be executed for EVERY item in the sequence, so for extremely large sequences like mine, this will cause OOM.
Is there no way to make the iteration of my sequence "lazy" in this example, so that only a fixed number are being processed at any one time?
Am I forced to use channels here (possibly with a buffered channel?) to force a blocking call during my sequence iteration until some items are processed? Or is there some other cleaner solution that I am missing.
In my actualt example, I am also using a supervisorScope to monitor each processing job, so If possible I would like to preserve that as well.
CodePudding user response:
The problem is probably that you are faster scheduling new tasks than it is possible to finish the tasks in parallel.
You can overcome this problem by chunking your data into the size that can be processed in parallel (in your example that is 10) and wait for each chunk to finish before you start with the next chunk:
runBlocking(Dispatchers.IO) {
massiveLazySequenceOfThings.chunked(10).forEach { chunk ->
println("I am processing $chunk")
chunk.map { async { process(it) } }.awaitAll()
}
}
Now each of the 10 elements in a chunk is processed in parallel, but each chunk as a whole is processed sequentially, making sure that you do not run out of memory due to too much scheduled tasks.
One little side note: The output "I am done processing $myThingToProcess"
in your example is not correct. The job may still be running in its own thread. The only thing you can say for sure is that you scheduled it.
CodePudding user response:
You can limit the number of parallel operations with Semaphore
.
runBlocking(Dispatchers.IO) {
val semaphore = Semaphore(permits = 10)
massiveLazySequenceOfThings.forEach { myThingToProcess ->
semaphore.acquire()
print("I am processing $myThingToProcess")
launch {
try {
process(myThingToProcess)
} finally {
semaphore.release()
}
}
print("I am done processing $myThingToProcess")
}
}