I am processing large File
s, having a list of them:
val originalFiles: List<File>
I need to read the InputStream
of each file, process it, and write it to another processedFile. For the sake of simplicity let's assume I just read the original InputStream and write it to the destination file output stream.
I would like to process the originalFiles in parallel.
The first straigtforward way would be to use parallelStream()
override suspend fun processFiles(workdir: File): Either<Throwable, Unit> = either {
val originalFiles = ...
originalFiles.parallelStream().forEach {
val destination = File("${workdir.absolutePath}/${it.name}-processed.txt")
logger.info("Starting merge {}", destination)
FileOutputStream(destination).use { faos ->
IOUtils.copy(it.inputStream(), faos)
}
logger.info("Finished processing {}", destination)
}
}
However, given that I'm working with coroutines and Arrow, I get a compile warning Possibly blocking call in non-blocking context could lead to thread starvation
.
- Is there a proper (non-blocking) way to work with Input/OutputStreams with coroutines/suspend functions?
- Is there a better way to paralellize the List processing with coroutines/Arrow?
CodePudding user response:
Your best best would be to use parTraverse
(going to be renamed to parMap
in 2.x.x). This function comes from Arrow Fx Coroutines, there is also Flow#parMap
and Flow#parMapUnordered
that you can use instead.
You also need to make sure that FileOutputStream
is closed correctly, and in face of cancellation, and I would recommend using Resource
for that.
The Possibly blocking call in non-blocking context could lead to thread starvation
warning will disappear by invoking it on the correct Dispatchers.IO
.
suspend fun processFiles(workdir: File): Unit {
val originalFiles: List<File> = emptyList<File>()
originalFiles.parTraverse(Dispatchers.IO) {
val destination = File("${workdir.absolutePath}/${it.name}-processed.txt")
logger.info("Starting merge {}", destination)
FileOutputStream(destination).use { faos ->
IOUtils.copy(it.inputStream(), faos)
}
logger.info("Finished processing {}", destination)
}
}
So the summary the answers on your question:
Is there a proper (non-blocking) way to work with Input/OutputStreams with coroutines/suspend functions?
Execute them using suspend
Dispatchers.IO
.
Is there a better way to paralellize the List processing with coroutines/Arrow?
Leverage parTraverse
to parallelise the List
transformations in Kotlin Coroutines. Optionally, parTraverseN
if you want to also limit the amount of parallel processes.