So I want to collect values until I see the last page, but if the last page never comes I want to send what we have, given a time limit.
I have a way of doing this but it seems rather wasteful. I'm going to be using this to make collections that may have hundreds of thousands of values, so a more space-efficient method would be preferred.
You can copy and paste the following into a playground
import UIKit
import Foundation
import Combine
var subj = PassthroughSubject<String, Never>()
let queue = DispatchQueue(label: "Test")
let collectForTime = subj
.collect(.byTime(queue, .seconds(10)))
let collectUntilLast = subj
.scan([String]()) { $0 [$1] }
.first { $0.last == "LastPage" }
// whichever happens first
let cancel = collectForTime.merge(with: collectUntilLast)
.first()
.sink {
print("complete1: \($0)")
} receiveValue: {
print("received1: \($0)")
}
print("start")
let strings = [
"!@#$",
"ZXCV",
"LastPage", // comment this line to test to see what happens if no last page is sent
"ASDF",
"JKL:"
]
// if the last page is present then the items 0..<3 will be sent
// if there's no last page then send what we have
// the main thing is that the system is not just sitting there waiting for a last page that never comes.
for i in (0..<strings.count) {
DispatchQueue.main.asyncAfter(deadline: .now() .seconds(i)) {
let s = strings[i]
print("sending \(s)")
subj.send(s)
}
}
CodePudding user response:
I made a small change to your technique
import Foundation
import Combine
var subj = PassthroughSubject<String, Never>()
let lastOrTimeout = subj
.timeout(.seconds(10), scheduler: RunLoop.main )
.print("watchdog")
.first { $0 == "LastPage" }
.append(Just("Done"))
let cancel = subj
.prefix(untilOutputFrom: lastOrTimeout)
.print("main_publisher")
.collect()
.sink {
print("complete1: \($0)")
} receiveValue: {
print("received1: \($0)")
}
print("start")
let strings = [
"!@#$",
"ZXCV",
"LastPage", // comment this line to test to see what happens if no last page is sent
"ASDF",
"JKL:"
]
// if the last page is present then the items 0..<3 will be sent
// if there's no last page then send what we have
// the main thing is that the system is not just sitting there waiting for a last page that never comes.
strings.enumerated().forEach { index, string in
DispatchQueue.main.asyncAfter(deadline: .now() .seconds(index)) {
print("sending \(string)")
subj.send(string)
}
}
lastOrTimeout
will emit a value when it see's LastPage
or finishes because of a timeout (and emits Done
).
The main pipeline collects values until the watchdog publisher emits a value and collects all the results.
CodePudding user response:
UPDATE
After playing a bit more with the playground I think all you need is:
subj
.prefix { $0 != "LastPage" }
.append("LastPage")
.collect(.byTime(DispatchQueue.main, .seconds(10)))
I wouldn't use collect
because under the hood it is basically doing the same thing that scan
is doing, you only need another condition in the first
closure eg: .first { $0.last == "LastPage" || timedOut }
to emit the collected items in case of timeout.
It's unfortunate that collect
doesn't offer the API you need but we can create another version of it.
The idea is to combineLatest
the scan
output with a stream that emits a Bool
after a deadline (In reality we also need to emit false
initially for combineLatest
to start) and ||
this additional variable inside filter
condition.
Here is the code:
extension Publisher {
func collect<S: Scheduler>(
timeoutAfter interval: S.SchedulerTimeType.Stride,
scheduler: S,
orWhere predicate: @escaping ([Output]) -> Bool
) -> AnyPublisher<[Output], Failure> {
scan([Output]()) { $0 [$1] }
.combineLatest(
Just(true)
.delay(for: interval, scheduler: scheduler)
.prepend(false)
.setFailureType(to: Failure.self)
)
.first { predicate($0) || $1 }
.map(\.0)
.eraseToAnyPublisher()
}
}
let subj = PassthroughSubject<String, Never>()
let cancel = subj
.collect(
timeoutAfter: .seconds(10),
scheduler: DispatchQueue.main,
orWhere: { $0.last == "LastPage" }
)
.print()
.sink { _ in }