Here is a debounce example:
Data within half a second will be discarded.
let bounces:[(Int,TimeInterval)] = [
(0, 0),
(1, 0.25), // 0.25s interval since last index
(2, 1), // 0.75s interval since last index
(3, 1.25), // 0.25s interval since last index
(4, 1.5), // 0.25s interval since last index
(5, 2) // 0.5s interval since last index
]
let subject = PassthroughSubject<Int, Never>()
cancellable = subject
.debounce(for: .seconds(0.5), scheduler: RunLoop.main)
.sink { index in
print ("Received index \(index)")
}
for bounce in bounces {
DispatchQueue.main.asyncAfter(deadline: .now() bounce.1) {
subject.send(bounce.0)
}
}
// Prints:
// Received index 1
// Received index 4
// Received index 5
But I want to combine these discarded data, My expected result is:
// Prints:
// Received index [0, 1]
// Received index [2, 3, 4]
// Received index [5]
Any help?
CodePudding user response:
You should not use debounce
as it is a filtering operation. Instead, use the overload of collect
that takes a TimeGroupingStrategy
- collect
collects all the elements from upstream into arrays.
cancellable = subject
.collect(.byTime(RunLoop.main, 0.5))
.sink { group in
print ("Received group \(group)")
}
CodePudding user response:
You can use scan
to accumulate the emitted values into an array, the trick is to reset the array once the debounce emits that array:
let subject = PassthroughSubject<Int, Never>()
var reset = false
let cancellable = subject
.receive(on: RunLoop.main)
.scan([], { reset ? [$1] : $0 [$1] })
.handleEvents(receiveOutput: { _ in reset = false })
.debounce(for: .seconds(0.5), scheduler: RunLoop.main)
.handleEvents(receiveOutput: { _ in reset = true })
.sink { indices in
print ("Received indices \(indices)")
}
There are two pitfalls with this approach, though:
- you need to switch the publisher to the main thread
- the outer variable and
handleEvents
are not too "paradigmic".
Though, you can wrap the not-so-nice logic into it's own publisher, and be a little bit more idiomatic:
extension Publishers {
struct DebouncedCollector<Upstream: Publisher, S: Scheduler>: Publisher {
typealias Output = [Upstream.Output]
typealias Failure = Upstream.Failure
private let upstream: Upstream
private let dueTime: S.SchedulerTimeType.Stride
private let scheduler: S
private let options: S.SchedulerOptions?
init(upstream: Upstream, dueTime: S.SchedulerTimeType.Stride, scheduler: S, options: S.SchedulerOptions?) {
self.upstream = upstream
self.dueTime = dueTime
self.scheduler = scheduler
self.options = options
}
func receive<S>(subscriber: S) where S : Subscriber, Failure == S.Failure, Output == S.Input {
var reset = false
upstream
.receive(on: scheduler)
.scan([], { reset ? [$1] : $0 [$1] })
.handleEvents(receiveOutput: { _ in reset = false })
.debounce(for: dueTime, scheduler: scheduler, options: options)
.handleEvents(receiveOutput: { _ in reset = true })
.receive(subscriber: subscriber)
}
}
}
extension Publisher {
func collectDebounced<S: Scheduler>(for dueTime: S.SchedulerTimeType.Stride, scheduler: S, options: S.SchedulerOptions? = nil) -> Publishers.DebouncedCollector<Self, S> {
.init(upstream: self, dueTime: dueTime, scheduler: scheduler, options: options)
}
}
, and use it like this:
let subject = PassthroughSubject<Int, Never>()
let cancellable = subject
.collectDebounced(for: .seconds(0.5), scheduler: RunLoop.main)
.sink { indices in
print ("Received indices \(indices)")
}