Home > Blockchain >  Replacing GlobalScope.launch with something better
Replacing GlobalScope.launch with something better

Time:10-09

I'm refactoring the following bit of code that's wrapping a CompletableFuture API into something that can be used with Coroutines, but it's using GlobalScope.launch { ... } which is discouraged:

suspend fun <T> transaction(f: suspend (Connection) -> T): T {
    val cf = CompletableFuture<T>()
    try {
        this.connectionPool.inTransaction { connection ->
            GlobalScope.launch {
                try {
                    cf.complete(f(connection))
                } catch (e: Throwable) {
                    cf.completeExceptionally(e)
                }
            }
            cf
        }
    } catch (e: Throwable) {
        log.error(e.message ?: "", e)
        cf.completeExceptionally(e)
    }
    return cf.await()
}

Getting rid of the CompletableFuture and replacing it with CompletableDeferred is the easy part:

suspend fun <T> transaction(f: suspend (Connection) -> T): T {
    val cdf = CompletableDeferred<T>()
    try {
        connectionPool.inTransaction { connection ->
            GlobalScope.launch {
                try {
                    cdf.complete(f(connection))
                } catch (e: Throwable) {
                    cdf.completeExceptionally(e)
                }
            }
            cdf.asCompletableFuture()
        }
    } catch (e: Throwable) {
        log.error(e.message ?: "", e)
        cdf.completeExceptionally(e)
    }
    return cdf.await()
}

The inTransaction API expects a CompletableFuture, I'm assuming this is for backwards compatibility with Java

override fun <A> inTransaction(f: (Connection) -> CompletableFuture<A>):
            CompletableFuture<A> =
        objectPool.use(configuration.executionContext) { it.inTransaction(f) }

Since I'm outside a CoroutineScope, I can't just call launch { ... } and since f is a suspend function, that section needs to be inside a CoroutineScope

Wrapping the connectionPool.inTransaction inside a coroutineScope in order to replace the GlobalScope locks up when it runs ...

    try {
        coroutineScope {
            connectionPool.inTransaction { connection ->
                launch {
                    try {
                        cdf.complete(f(connection))
                    } catch (e: Throwable) {
                        cdf.completeExceptionally(e)
                    }
                }
                cdf.asCompletableFuture()
            }
        }
    } catch (e: Throwable) {

Similarly with

    try {
        coroutineScope {
            connectionPool.inTransaction { connection ->
                async {
                    try {
                        cdf.complete(f(connection))
                    } catch (e: Throwable) {
                        cdf.completeExceptionally(e)
                    }
                }
                cdf.asCompletableFuture()
            }
        }
    } catch (e: Throwable) {

Adding some good ol' println debugging:

suspend fun <T> transaction(f: suspend (Connection) -> T): T {
        val cdf = CompletableDeferred<T>()
        println("1")
        try {
            println("2")
            coroutineScope {
                println("3")
                connectionPool.inTransaction { connection ->
                    println("4")
                    launch {
                        println("5")
                        try {
                            println("6")
                            cdf.complete(f(connection))
                            println("7")
                        } catch (e: Throwable) {
                            println("8")
                            cdf.completeExceptionally(e)
                            println("9")
                        }
                    }
                    println("10")
                    cdf.asCompletableFuture()
                }
            }
        } catch (e: Throwable) {
            log.error(e.message ?: "", e)
            cdf.completeExceptionally(e)
        }
        println("11")
        return cdf.await()
    }

Outputs:

1
2
3
11
4
10

followed by a stacktrace that the query timed out

timeout query item <postgres-connection-2> after 73751 ms and was not cleaned by connection as it should, will destroy it - timeout is 30000

This means the code inside the launch is never executing, similarly with async, I'm assuming some thread is getting blocked somewhere.

Replacing the GlobalScope.launch with CoroutineScope(Dispatchers.IO).launch { ... } works (similarly with Dispatchers.Unconfined)), but is this the correct solution here? If CompletableFuture is thread-blocking, then this is probably better than the GlobalScope.launch solution ...

suspend fun <T> transaction(f: suspend (Connection) -> T): T {
        val cdf = CompletableDeferred<T>()
        try {
            connectionPool.inTransaction { connection ->
                CoroutineScope(Dispatchers.IO).launch {
                    try {
                        cdf.complete(f(connection))
                    } catch (e: Throwable) {
                        cdf.completeExceptionally(e)
                    }
                }
                cdf.asCompletableFuture()
            }
        } catch (e: Throwable) {
            log.error(e.message ?: "", e)
            cdf.completeExceptionally(e)
        }
        return cdf.await()
    }

Any suggestions on what the correct way is to get rid of that GlobalScope.launch?

CodePudding user response:

It's pretty complicated, but I believe the cause of launch()/async() not executing is that their parent coroutineScope() already finished at this point in time. Notice "11" happens before "4", meaning that you invoked launch() after going out of coroutineScope(). Which makes sense, because inTransaction() starts asynchronous operation, so it returns immediately, without waiting for the inner code. To fix this, you just need to move cdf.await() inside coroutineScope().

Another thing that concerns me is that you await on completable that you created by yourself and not the one returned from inTransaction(). Note that it may a totally different CompletableFuture and in that case you actually return before the operation completes.

Also, I'm not sure if this manual exception handling for completable makes any sense. async() already returns CompleteableDeferred with exception handling, so I'm not sure if converting it into another one does anything (maybe it actually does).

Try this code:

suspend fun <T> transaction(f: suspend (Connection) -> T): T {
    return coroutineScope {
        connectionPool.inTransaction { connection ->
            async { f(connection) }.asCompletableFuture()
        }.await()
    }
}
  • Related