Home > Software engineering >  How this piece of code can be improved and rewritten to kotlin coroutines
How this piece of code can be improved and rewritten to kotlin coroutines

Time:12-30

I'm trying to achieve functionality: I have a rest endpoint that calls code that execution can take a lot of time. My idea to improve experience for now is to wrap that piece of code as a new thread, wait for completion or for some max time to elapse and return an appropriate message. Wrapped code should be completed even through endpoint already send message back. Current implementation looks like this:

private const val N = 1000
private const val MAX_WAIT_TIME = 5000

@RestController
@RequestMapping("/long")
class SomeController(
    val service: SomeService,
) {
    private val executor = Executors.newFixedThreadPool(N)

    @PostMapping
    fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
        val submit = executor.submit {
            service.longExecution(someParam)
        }
        val start = System.currentTimeMillis()

        while (System.currentTimeMillis() - start < MAX_WAIT_TIME) {
            if (submit.isDone)
                return ResponseEntity.ok("Done")
        }
        return ResponseEntity.ok("Check later")
    }
}

First question is - waiting on while for time seems wrong, we don't release thread, can it be improved?

More important question - how to rewrite it to Kotlin coroutines? My attempt, simple without returning as soon as task is done, looked like this:

    @PostMapping
    fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> = runBlocking {

        val result = async {
            withContext(Dispatchers.Default) {
                service.longExecution(someParam)
            }
        }
        delay(MAX_WAIT_TIME)
        return@runBlocking ResponseEntity.ok(if(result.isCompleted) "Done" else "Check later")
    }

But even through correct string is returned, answer is not send until longExecution is done. How to fix that, what am I missing? Maybe coroutines are bad application here?

CodePudding user response:

Your implementation always waits for MAX_WAIT_TIME. This might work:

@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> = runBlocking {

    try {
        withTimeout(MAX_WAIT_TIME) {
            async {
                withContext(Dispatchers.Default) {
                    service.longExecution(someParam)
                }
            }
        }
    } catch (ex: CancellationException) {
        return@runBlocking ResponseEntity.ok("Check later")
    }
    return@runBlocking ResponseEntity.ok("Done")
}

Although I'm not sure if there will be any unwanted side effects because it seems that this will cancel the coroutine when it reaches MAX_WAIT_TIME. Read more about it here:
Cancellation and timeouts

CodePudding user response:

There are several problems with your current coroutines attempt:

  1. you are launching your async computation within runBlocking's scope, so the overall endpoint method will wait for child coroutines to finish, despite your attempt at return-ing before that.
  2. delay() will always wait for MAX_WAIT_TIME even if the task is done quicker than that
  3. (optional) you don't have to use runBlocking at all if your framework supports async controller methods (Spring WebFlux does support suspend functions in controllers)

For the first problem, remember that every time you launch a coroutine that should outlive your function, you have to use an external scope. coroutineScope or runBlocking are not appropriate in these cases because they will wait for your child coroutines to finish.

You can use the CoroutineScope() factory function to create a scope, but you need to think about the lifetime of your coroutine and when you want it cancelled. If the longExecution function has a bug and hangs forever, you don't want to leak the coroutines that call it and blow up your memory, so you should cancel those coroutines somehow. That's why you should store the scope as a variable in your class and cancel it when appropriate (when you want to give up on those operations).

For the second problem, using withTimeout is very common, but it doesn't fit your use case because you want the task to keep going even after you timeout waiting for it. One possible solution would be using select clauses to either wait until the job is done, or wait for some specified maximum time:

// TODO call scope.cancel() somewhere appropriate (when this component is not needed anymore)
val scope = CoroutineScope(Job())

@PostMapping
fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
    val job = scope.launch {
        longExecution()
    }
    
    val resultText = runBlocking {
        select {
            job.onJoin() { "Done" }
            onTimeout(MAX_WAIT_TIME) { "Check later" } 
        }
    }
    return ResponseEntity.ok(resultText)
}

Note: I'm using launch instead of async because you don't seem to need the return value of longExecution here.


If you want to solve the problem #3 too, you can simply declare your handler suspend and remove runBlocking around the select:

// TODO call scope.cancel() somewhere appropriate (when this component is not needed anymore)
val scope = CoroutineScope(Job())

@PostMapping
suspend fun longEndpoint(@RequestParam("someParam") someParam: Long): ResponseEntity<String> {
    val job = scope.launch {
        longExecution()
    }
    
    val resultText = select {
        job.onJoin() { "Done" }
        onTimeout(MAX_WAIT_TIME) { "Check later" }
    }
    return ResponseEntity.ok(resultText)
}

Note that this requires spring-boot-starter-webflux instead of spring-boot-starter-web.

  • Related