How to "take(N)" iteratively - get a Sequence<Sequence>, each inner sequences having next N elements?
I am writing a high-load application in Kotlin.
I have tens of thousands of entries to insert to a database.
I want to batch them by, say, 1000.
So I created a loop:
val itemsSeq = itemsList.iterator().asSequence()
while (true) {
log.debug("Taking $BATCH_SIZE from $itemsSeq")
val batchSeq = itemsSeq.take(BATCH_SIZE)
val squareBatch = applySomething(batchSeq, something)
?: break
}
fun applySomething(batch: Sequence<Item>, something: Something) {
/* Fully consumes batch. Bulk-loads from DB by IDs, applies, bulk-saves. */
}
I thought that take()
would advance the itemsSeq
and the next call to take()
would give me a sequence "view" of itemsSeq
starting at the 10th item.
But with this code, I am getting:
DEBUG Taking 10 from kotlin.sequences.ConstrainedOnceSequence@53fe15ff
Exception in thread "main" java.lang.IllegalStateException: This sequence can be consumed only once.
at kotlin.sequences.ConstrainedOnceSequence.iterator(SequencesJVM.kt:23)
at kotlin.sequences.TakeSequence$iterator$1.<init>(Sequences.kt:411)
at kotlin.sequences.TakeSequence.iterator(Sequences.kt:409)
So it seems that the take()
"opens" the itemsSeq
again, while that can be consumed only once.
As a workaround, I can use chunked()
:
public fun <T> Sequence<T>.chunked(size: Int): Sequence<List<T>> {
But I would prefer not to create List
s, rather Sequence
s.
What I am looking for is something between take()
and chunked()
.
Is there anything such in Kotlin SDK?
I can possibly create my own sequence { ... }
but for readability, I would prefer something built-in.
CodePudding user response:
There is a way to construct a Sequence by handing it over an Iterator, see Sequence.
Given an iterator function constructs a Sequence that returns values through the Iterator provided by that function. The values are evaluated lazily, and the sequence is potentially infinite.
Wrapped in an extension function it could look like this:
fun <T> Iterable<T>.toValuesThroughIteratorSequence(): Sequence<T> {
val iterator = this.iterator()
return Sequence { iterator }
}
Quick test:
data class Test(val id: Int)
val itemsList = List(45) { Test(it) }
val batchSize = 10
val repetitions = itemsList.size.div(batchSize) 1
val itemsSeq = itemsList.toValuesThroughIteratorSequence()
(0 until repetitions).forEach { index ->
val batchSeq = itemsSeq.take(batchSize)
println("Batch no. $index: " batchSeq.map { it.id.toString().padStart(2, ' ') }.joinToString(" "))
}
Output:
Batch no. 0: 0 1 2 3 4 5 6 7 8 9
Batch no. 1: 10 11 12 13 14 15 16 17 18 19
Batch no. 2: 20 21 22 23 24 25 26 27 28 29
Batch no. 3: 30 31 32 33 34 35 36 37 38 39
Batch no. 4: 40 41 42 43 44
CodePudding user response:
Background
First of all, we need to be aware there is a big difference between an object that we can iterate over and object that represents a "live" or already running iteration process. First group means Iterable
(so List
, Set
and all other collections), Array
, Flow
, etc. Second group is mostly Iterator
or old Java Enumeration
. The difference could be also compared to file vs file pointer when reading or database table vs database cursor.
Sequence
belongs to the first group. Sequence object does not represent a live, already started iteration, but just a set of elements. These elements can be produced lazily, sequence could have unbounded size and usually internally it works by using iterators, but conceptually sequence is not an iterator itself.
If we look into the documentation about sequences it clearly compares them to Iterable
, not to Iterator
. All standard ways to construct sequences like: sequenceOf()
, sequence {}
, Iterable.asSequence()
produce sequences that return the same list of items every time we iterate over them. Iterator.asSequence()
also follows this pattern, but because it can't re-produce same items twice, it is intentionally protected against iterating multiple times:
public fun <T> Iterator<T>.asSequence(): Sequence<T> = Sequence { this }.constrainOnce()
Problem
Your initial attempt with using take()
didn't work, because this is a misuse of sequences. We expect that subsequent take()
calls on the same sequence object will produce exactly the same items (usually), not next items. Similarly as we expect multiple take()
calls on a list always produce same items, each time starting from the beginning.
Being more specific, your error was caused by above constrainOnce()
. When we invoke take()
multiple times on a sequence, it has to restart from the beginning, but it can't do this if it was created from an iterator, so Iterator.asSequence()
explicitly disallows this.
Simple solution
To fix the problem, you can just skip constrainOnce()
part, as suggested by @lukas.j. This solution is nice, because stdlib already provides tools like Sequence.take()
, so if used carefully, this is the easiest to implement and it just works.
However, I personally consider this a kind of workaround, because the resulting sequence doesn't behave as sequences do. It is more like an iterator on steroids than a real sequence. You need to be careful when using such sequence with existing operators or 3rd party code, because such sequence may work differently than they expect and as a result, you may get incorrect results.
Advanced solution
We can follow your initial attempt of using subsequent take()
calls. In this case our object is used for live iteration, so it is no longer a proper sequence, but rather an iterator. The only thing we miss in stdlib is a way to create a sub-iterator with a single chunk. We can implement it by ourselves:
fun main() {
val list = (0 .. 25).toList()
val iter = list.iterator()
while (iter.hasNext()) {
val chunk = iter.limited(10)
println(chunk.asSequence().toList())
}
}
fun <T> Iterator<T>.limited(n: Int): Iterator<T> = object : Iterator<T> {
var left = n
val iterator = this@limited
override fun next(): T {
if (left == 0)
throw NoSuchElementException()
left--
return iterator.next()
}
override fun hasNext(): Boolean {
return left > 0 && iterator.hasNext()
}
}
I named it limited()
, because take()
suggests we read items from the iterator. Instead, we only create another iterator on top of the provided iterator.
Of course, sequences are easier to use than iterators and typical solution to this problem is by using chunked()
. With above limited()
it is pretty straightforward to implement chunkedAsSequences()
:
fun main() {
val list = (0 .. 25).toList()
list.asSequence()
.chunkedAsSequences(10)
.forEach { println(it.toList()) }
}
fun <T> Sequence<T>.chunkedAsSequences(size: Int): Sequence<Sequence<T>> = sequence {
val iter = iterator()
while (iter.hasNext()) {
val chunk = iter.limited(size)
yield(chunk.asSequence())
chunk.forEach {} // required if chunk was not fully consumed
}
}
Please also note there is a tricky case of chunk being not fully consumed. chunkedAsSequences()
is protected against this scenario. Previous simpler solutions aren't.