Home > Mobile >  RxSwift: share() alternative that guarantees single subscription on the upstream
RxSwift: share() alternative that guarantees single subscription on the upstream

Time:05-24

I've always thought .share(replay: 1, scope: .forever) shares the single upstream subscription no matter how many downstream subscribers there are.

However, I've just discovered that if the count of the downstream subscriptions drops to zero, it stops "sharing" and releases the subscription on the upstream (because refCount() is used under the hood). So when a new downstream subscription happens, it has to re-subscribe on the upstream. In the following example:

let sut = Observable<Int>
    .create { promise in
        print("create")
        promise.onNext(0)
        return Disposables.create()
    }
    .share(replay: 1, scope: .forever)

sut.subscribe().dispose()
sut.subscribe().dispose()

I would expect create to be printed just once, but it gets printed twice. And if I remove .dispose() calls - just once.

How do I set up the chain where the upstream is guaranteed to be subscribed at most once?

CodePudding user response:

I am not sure why .share(replay: 1, scope: .forever) is not giving the behaviour you want (I also think it should work like you describe) but what about this other way without share?

// You will subscribe to this and not directly on sut (maybe hiding Subject interface to avoid onNext calls from observers)
let subject = ReplaySubject<Int>.create(bufferSize: 1)

let sut = Observable<Int>.create { obs in
  print("Performing work ...")
  obs.onNext(0)
  return Disposables.create()
}

// This subscription is hidden, happens only once and stays alive forever
sut.subscribe(subject)

// Observers subscribe to the public stream
subject.subscribe().dispose()
subject.subscribe().dispose()

CodePudding user response:

The goal you describe implies you should be using multicast (or one of operators that use it, like publish(), replay(_:) or replayAll()) and not share...

let sut = Observable<Int>
    .create { observer in
        print("create")
        observer.onNext(0)
        return Disposables.create()
    }
    .replay(1)

let disposable = sut.connect() // subscription will stay alive until dispose() is called on this disposable...

sut.debug("one").subscribe().dispose()
sut.debug("two").subscribe().dispose()

To understand the difference between .forever and .whileConnected, read the documentation in the "ShareReplayScope.swift" file. Both are refcounted, but the difference is in how re-subscription operators are handled. Here is some test code to show the difference...

class SandboxTests: XCTestCase {
    var scheduler: TestScheduler!
    var observable: Observable<String>!

    override func setUp() {
        super.setUp()
        scheduler = TestScheduler(initialClock: 0)
        // creates an observable that will error on the first subscription, then call `.onNext("A")` on the second.
        observable = scheduler.createObservable(timeline: "-#-A")
    }

    func testWhileConnected() {
        // this shows that re-subscription gets through the while connected share to the source observable
        let result = scheduler.start { [observable] in
            observable!
                .share(scope: .whileConnected)
                .retry(2)
        }
        XCTAssertEqual(result.events, [
            .next(202, "A")
        ])
    }

    func testForever() {
        // however re-subscription doesn't get through on a forever share
        let result = scheduler.start { [observable] in
            observable!
                .share(scope: .forever)
                .retry(2)
        }
        XCTAssertEqual(result.events, [
            .error(201, NSError(domain: "Test Domain", code: -1))
        ])
    }
}
  • Related