Home > other >  Combine - merging multiple shared filters
Combine - merging multiple shared filters

Time:11-15

I've been working with RxSwift for a while now, just switched to Combine and I am trying to wrap my head around this specific .filter behaviour. Here's a short playground example:

import Combine

let publisher = [1, 2, 3, 4, 5]
    .publisher
    .share()

let filter1 = publisher
    .filter { $0 == 1 }
    .print("filter1")

let filter2 = publisher
    .filter { $0 == 2 }
    .print("filter2")

Publishers
    .Merge(filter1, filter2)
    .sink {
        print("Result is: \($0)")
    }

the output is

filter1: receive subscription: (Multicast)
filter1: request unlimited
filter1: receive value: (1)
Result is: 1
filter1: receive finished
filter2: receive subscription: (Multicast)
filter2: request unlimited
filter2: receive finished

What surprises me is that Result is: 2 is never called because the stream finishes. I could remove .share() operator which would result in receiving both values as I'd expect

filter1: receive subscription: ([1])
filter1: request unlimited
filter1: receive value: (1)
Result is: 1
filter1: receive finished
filter2: receive subscription: ([2])
filter2: request unlimited
filter2: receive value: (2)
Result is: 2
filter2: receive finished

But what if my publisher is an API call and I don't want to create a duplicate network request? Which is exactly the case I am trying to handle now and it's also why I need to use .share() operator.

Any better explanation why is this happening and how to handle a case where you want to filter a stream, do a separate logic in each stream and then merge the results back together?

CodePudding user response:

So there are a couple of different things going on here.

First, the [1, 2, 3].publisher works different than Observable.from([1, 2, 3]). The latter emits the values once per cycle, while the former emits all the values back to back. The Publisher example works more like this in Rx:

Observable<Int>.create { observer in
    [1, 2, 3, 4, 5].forEach {
        observer.onNext($0)
    }
    observer.onCompleted()
    return Disposables.create()
}

Because of this, in the Observable.from case, the emissions are not complete by the time the filter2 observable is subscribed to. So even if you omit the share() both "Result is: 1" and "Result is: 2" will be emitted.

Second, the share() operator also works differently. By default, the RxSwift share operator will reset the Observable once all subscriptions are disposed (it's a reference counting share). In the Combine case, the share operator makes the publisher connectable and then connects to it. Essentially, it's the same as the .share(replay: 0, scope: .forever) operator in RxSwift (something I have never needed in Rx BTW).

So the Rx code that is equivalent to the Combine code you posted is actually this:

let observable = emitSequence([1, 2, 3, 4, 5])
    .share(replay: 0, scope: .forever)

let filter1ʹ = observable
    .filter { $0 == 1 }
    .debug("filterʹ1")

let filter2ʹ = observable
    .filter { $0 == 2 }
    .debug("filterʹ2")

Observable.merge(filter1ʹ, filter2ʹ)
    .subscribe(onNext: {
        print("Resultʹ is: \($0)")
    })

func emitSequence<S>(_ sequence: S) -> Observable<S.Element> where S: Sequence {
    Observable.create { observer in
        sequence.forEach {
            observer.onNext($0)
        }
        observer.onCompleted()
        return Disposables.create()
    }
}

All this said the practical aspect of dealing with an API call is fine. In that case, the assumption is that the call won't immediately return (it will take a cycle at least) and since it's one-shot, as long as you make sure you aren't resubscribing to the Observable, the fact that share() doesn't reset isn't a problem.

  • Related