Home > Software design >  How can you paralellize the processing of File InputStreams in Kotlin / Arrow?
How can you paralellize the processing of File InputStreams in Kotlin / Arrow?

Time:10-07

I am processing large Files, having a list of them:

val originalFiles: List<File>

I need to read the InputStream of each file, process it, and write it to another processedFile. For the sake of simplicity let's assume I just read the original InputStream and write it to the destination file output stream.

I would like to process the originalFiles in parallel.

The first straigtforward way would be to use parallelStream()

override suspend fun processFiles(workdir: File): Either<Throwable, Unit> = either {
    
    val originalFiles = ...

    originalFiles.parallelStream().forEach {
        val destination = File("${workdir.absolutePath}/${it.name}-processed.txt")
        logger.info("Starting merge {}", destination)
        FileOutputStream(destination).use { faos ->
            IOUtils.copy(it.inputStream(), faos)
        }
        logger.info("Finished processing {}", destination)            
    }
}

However, given that I'm working with coroutines and Arrow, I get a compile warning Possibly blocking call in non-blocking context could lead to thread starvation.

  • Is there a proper (non-blocking) way to work with Input/OutputStreams with coroutines/suspend functions?
  • Is there a better way to paralellize the List processing with coroutines/Arrow?

CodePudding user response:

Your best best would be to use parTraverse (going to be renamed to parMap in 2.x.x). This function comes from Arrow Fx Coroutines, there is also Flow#parMap and Flow#parMapUnordered that you can use instead.

You also need to make sure that FileOutputStream is closed correctly, and in face of cancellation, and I would recommend using Resource for that.

The Possibly blocking call in non-blocking context could lead to thread starvation warning will disappear by invoking it on the correct Dispatchers.IO.

suspend fun processFiles(workdir: File): Unit {
  val originalFiles: List<File> = emptyList<File>()
  originalFiles.parTraverse(Dispatchers.IO) {
    val destination = File("${workdir.absolutePath}/${it.name}-processed.txt")
    logger.info("Starting merge {}", destination)
    FileOutputStream(destination).use { faos ->
      IOUtils.copy(it.inputStream(), faos)
    }
    logger.info("Finished processing {}", destination)
  }
}

So the summary the answers on your question:

Is there a proper (non-blocking) way to work with Input/OutputStreams with coroutines/suspend functions?

Execute them using suspend Dispatchers.IO.

Is there a better way to paralellize the List processing with coroutines/Arrow?

Leverage parTraverse to parallelise the List transformations in Kotlin Coroutines. Optionally, parTraverseN if you want to also limit the amount of parallel processes.

  • Related