Home > Mobile >  Iterator which asynchronously fetches the next batch from a service [kotlin coroutines]
Iterator which asynchronously fetches the next batch from a service [kotlin coroutines]

Time:11-09

I've implemented a synchronous iterator which fetches batches of data from an external service (may take some time or hang).

I'd like to fetch a next batch asynchronously while the caller code iterates through the entries of a sequence of the current batch.

Ideally, I'd like to use kotlin coroutines.

Desired process:

  1. The iterator fetches the 1st batch
  2. The iterator starts fetching 2nd batch in the background
  3. Meanwhile, the caller processes the first batch
  4. After processing the first batch, the caller can immediately process the 2nd batch which was was fetched by the iterator in the background
  5. The iterator starts fetching the 3rd batch in the background
  6. After processing the 2nd batch, the caller can immediately process the 3rd batch which was was fetched by the iterator in the background
  7. The iterator starts fetching the 4rd batch in the background
  8. etc.

My synchronous implementation:

    fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
        return object : Iterator<LogPayloadType>, AutoCloseable {
            val logging: Logging = options.service
            var currentPage: Page<LogEntry> = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...))
            var i = 0
            var batch: List<LogEntry> = currentPage.values.toList()
            var isClosed = false

            override fun close() {
                logging.close()
                isClosed = true
            }

            override fun hasNext(): Boolean {
                if (isClosed) 
                    return false
                val hasNext = i < batch.size || currentPage.hasNextPage()
                if (!hasNext) {
                    logging.close()
                }
                return hasNext
            }

            override fun next(): LogPayloadType {
                if (!hasNext()) {
                    throw NoSuchElementException()
                }
                if (i == batch.size - 1 && currentPage.hasNextPage()) {
                    currentPage = currentPage.nextPage
                    batch = currentPage.values.toList()
                    i = 0
                }
                val logEntry = batch[i  ]
                return logEntry.getPayload()
            }
        }.asSequence()
    }

Ofc I'm open to completely different solutions as long as they can be wrapped in a kotlin's sequence.

EDIT

Here is an asynchronous implementation which uses thread { }. I couldn't achieve this with coroutines

    fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
        return object : Iterator<LogPayloadType> {
            private var currentPage: Page<LogEntry> = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
            private var i = 0
            private var batch: List<LogEntry> = currentPage.values.toList()
            private var nextPage: Page<LogEntry>? = null
            private var job: Thread? = null

            override fun hasNext() = i < batch.size || currentPage.hasNextPage()

            override fun next(): LogPayloadType {
                if (!hasNext()) {
                    throw NoSuchElementException()
                }
                if (currentPage.hasNextPage()) {
                    if (nextPage == null && job == null) {
                        job = thread { nextPage = readNextPage(currentPage) }
                    }
                    if (i == batch.size) {
                        job!!.join()
                        currentPage = nextPage!!
                        job = thread { nextPage = readNextPage(currentPage) }
                        batch = currentPage.values.toList()
                        i = 0
                    }
                }
                val logEntry = batch[i  ]
                return logEntry.getPayload<Payload<*>>()
            }

            private fun readNextPage(curPage: Page<LogEntry>): Page<LogEntry>? = curPage.nextPage

        }.asSequence()
    }

CodePudding user response:

Keep in mind I can't test any of the following code due to all the classes unknown to me that you're using, so there might be errors.

First of all, your code that uses threads can be simplified quite a bit using the sequence builder:

// Not sure about how this should behave as you treat it like a blocking function.
// I return null when it's exhausted to simplify while loop.
private fun <T> readNextPageOrNull(page: Page<T>): Page<T>? = 
    if (page.hasNextPage()) page.nextPage!! else null

fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
    return sequence {
        var jobResult: Page<LogEntry>? = null
        var job = thread {
            jobResult = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        }
        while (true) {
            job.join()
            val page = jobResult ?: break
            job = thread { jobResult = readNextPageOrNull(page) }
            yieldAll(page.values.asIterable())
        }
    }
}

You can simplify it further and take advantage of thread pools instead of using a bunch of new threads by using CompletableFuture.supplyAsync {} instead of thread {}:

fun readStructuredLogs(...): Sequence<Payload.JsonPayload> {
    return sequence {
        var job = CompletableFuture.supplyAsync {
            logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        }
        while (true) {
            val page = job.join() ?: break
            job = CompletableFuture.supplyAsync { readNextPageOrNull(page) }
            yieldAll(page.values.asIterable())
        }
    }
}

We can convert this to use coroutines, but it is awkward to convert coroutine code to blocking code. You have to use runBlocking. This doesn't buy you much, but it will use the coroutine Dispatcher thread pools that you're probably already using if you're using coroutines. Here, coroutineScope is whatever scope is appropriate in the current class.

fun readStructuredLogs(): Sequence<Payload.JsonPayload> {
    return sequence {
        var job: Deferred<Page<LogEntry>?> = coroutineScope.async(Dispatchers.IO) {
            logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        }
        while (true) {
            val page = runBlocking { job.await() } ?: break
            job = coroutineScope.async(Dispatchers.IO) { readNextPageOrNull(page) }
            yieldAll(page.values.asIterable())
        }
    }
}

If you're already using coroutines, you might consider using a Flow instead of Sequence so it suspends instead of blocking when you're awaiting the next items. There might be a simpler way to do this using Flow operators, but I just did a quick modification of my above sequence code:

fun readStructuredLogs(...): Flow<Payload.JsonPayload> {
    return flow {
        var job: Deferred<Page<LogEntry>?> = coroutineScope.async(Dispatchers.IO) {
            logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        }
        while (true) {
            val page = job.await() ?: break
            job = coroutineScope.async(Dispatchers.IO) { readNextPageOrNull(page) }
            emitAll(page.values.asFlow())
        }
    }
}

Edit: Possible way to do this with buffer:

fun readStructuredLogs(...): Flow<Payload.JsonPayload> {
    return flow {
        var page = logging.listLogEntries(Logging.EntryListOption.filter(buildLogFilter(...)))
        while (true) {
            emit(page)
            page = readNextPageOrNull(page) ?: break
        }
    }
        .flowOn(Dispatchers.IO)
        .buffer(1)
        .flatMapConcat { it.values.asFlow() }
}
  • Related