Home > Blockchain >  Combine - merge 2 publishers into a single publisher that returns a value when either underlying pub
Combine - merge 2 publishers into a single publisher that returns a value when either underlying pub

Time:01-14

I have 2 publishers that return the same type of value. One publisher is meant to emit N/W responses and other is meant to emit cached responses. These publishers will be shared across multiple requests by different consumers.

I am emitting value on them from different code paths. And I am trying to merge these 2 upstream publishers into a single publisher that will return value from either of the 2, to a downstream consumer.

I tried using MergeMany but it did not work for me. Publisher1 was sending value but Publisher2 was not sending values. So the downstream sink did not fire.

I tried using CombineLatest. But that did not work as 1. It returns a tuple like (Response, Response) instead of just Response 2. It again waits for both to return some value. I can't use Zip for the same reasons.

I saw switchToLatest() which sounds promising, but I could not find a way to use it on a list of Publishers.

Edit:

I also tried this and had similar result as MergeMany

return networkPublisher
//    .merge(with: cachedPublisher) // If this is uncommented then the network publisher does not process all the network responses for some reason. 
    .eraseToAnyPublisher()

CodePudding user response:

First, the example in your own answer should actually work without subscribe(on:) and receive(on:). Why this doesn't happen? Because your implementation is most likely not thread safe. I think, that is your main problem. Also please note, that subscribe(on:) set a scheduler for up- AND downstream. This means that receive(on:) is unnecessary in your example.

Below are two variants for merging two publishers:

let subscription = publisher1.merge(with: publisher2)
    .sink(receiveValue: { value in
        // your logic
}) 

OR

let subscription = Publishers.Merge(publisher1, publisher2)
    .sink(receiveValue: { value in
        // your logic
})

Hopefully I could help you.

CodePudding user response:

All I had to add was a receive and subscribe to the chain:

return networkPublisher
    .merge(with: cachedPublisher) 
    .receive(on: dispatchQueue)
    .subscribe(on: dispatchQueue)
    .eraseToAnyPublisher()
  • Related