Home > Back-end >  Mix of distinctUntilChanged() and throttle(_ dueTime: RxTimeInterval, latest: Bool = true, scheduler
Mix of distinctUntilChanged() and throttle(_ dueTime: RxTimeInterval, latest: Bool = true, scheduler

Time:10-26

Is it possible to prepare mechanism in RXSwift which works like mix between
func distinctUntilChanged() and func throttle(_ dueTime: RxTimeInterval, latest: Bool = true, scheduler: SchedulerType).

Lets assume there are many signals being emitted to the subscription like: A, B, C, D, B, D, A, X, C, Z ... and lets assume I want to pass only once signal which occurrences can take place multiple times in specified timespan. In practise it means that if I specify timespan as 5s and signal with value A takes place 3 times during this timespan I will only pass this signal once and after timespan passed this mechanizm is being reset and continuous working in the manner. Obviously timespan starts when first occurrence of signal A takes place. In meantime all other signals (D, C, ...) works in the same approach.

Let me know if this is feasible.

CodePudding user response:

Likely something very much like this will do what you want:

func example<T>(scheduler: SchedulerType, dueTime: RxTimeInterval, source: Observable<T>) -> Observable<T> where T: Hashable {
    source.groupBy(keySelector: { $0 })
        .flatMap { $0.throttle(dueTime, latest: false, scheduler: scheduler) }
}

Here are some tests and their results:

final class ExampleTests: XCTestCase {
    // at three seconds, everything gets through
    func testAt3() {
        let scheduler = TestScheduler(initialClock: 0)
        let source = scheduler.createObservable(timeline: "A-B-C-D-B-D-A-X-C-Z")
        let expected = parseEventsAndTimes(timeline:      "A-B-C-D-B-D-A-X-C-Z", values: { String($0) })
            .offsetTime(by: 200)[0]
        let result = scheduler.start {
            example(scheduler: scheduler, dueTime: .seconds(3), source: source)
        }
        XCTAssertEqual(result.events, expected)
    }

    // at five seconds, the extra D is stopped
    func testAt5() {
        let scheduler = TestScheduler(initialClock: 0)
        let source = scheduler.createObservable(timeline: "A-B-C-D-B-D-A-X-C-Z")
        let expected = parseEventsAndTimes(timeline:      "A-B-C-D-B---A-X-C-Z", values: { String($0) })
            .offsetTime(by: 200)[0]
        let result = scheduler.start {
            example(scheduler: scheduler, dueTime: .seconds(5), source: source)
        }
        XCTAssertEqual(result.events, expected)
    }

    // at seven seconds, the extra B and D are both stopped.
    func testAt7() {
        let scheduler = TestScheduler(initialClock: 0)
        let source = scheduler.createObservable(timeline: "A-B-C-D-B-D-A-X-C-Z")
        let expected = parseEventsAndTimes(timeline:      "A-B-C-D-----A-X-C-Z", values: { String($0) })
            .offsetTime(by: 200)[0]
        let result = scheduler.start {
            example(scheduler: scheduler, dueTime: .seconds(7), source: source)
        }
        XCTAssertEqual(result.events, expected)
    }
}
  • Related