I have an array of publishers that I want to wait on until all of the publishers have emitted a value, and then work with the results.
I tried using collect()
with Publishers.MergeMany
(edit: using MergeMany
because I'm looking for a solution that could scale up to 40 publishers or more) , but collect just gathers the results in the order they are received but without respecting the order of the values as to correctly correlate the publisher and value.
To simplify the code and illustrate the issue, here's the Playground code:
import Combine
var cancellables = [AnyCancellable]()
let stringPublishers = [
Just("1").delay(for: RunLoop.SchedulerTimeType.Stride(Double.random(in: 0.1...1.0)), scheduler: RunLoop.main),
Just("2").delay(for: RunLoop.SchedulerTimeType.Stride(Double.random(in: 0.1...1.0)), scheduler: RunLoop.main),
Just("3").delay(for: RunLoop.SchedulerTimeType.Stride(Double.random(in: 0.1...1.0)), scheduler: RunLoop.main),
Just("4").delay(for: RunLoop.SchedulerTimeType.Stride(Double.random(in: 0.1...1.0)), scheduler: RunLoop.main),
Just("5").delay(for: RunLoop.SchedulerTimeType.Stride(Double.random(in: 0.1...1.0)), scheduler: RunLoop.main)
]
Publishers.MergeMany(stringPublishers).collect().sink { value in
print(value)
}.store(in: &cancellables)
Ran the code and the results are usually unordered like ["2", "4", "5", "3", "1"]
, ["1", "4", "5", "3", "2"]
, and so.
How can I always get ["1", "2", "3", "4", "5"]
?
I tried adapting the solution here but I'm having trouble since my inputArray
is already an array of Publishers
, not plain values of integers or strings.
CodePudding user response:
Here is a function that will take any number of Publishers and combine them into a single publisher that contains an array of elements that respects the order:
func combineLatest<Pub>(_ pubs: [Pub]) -> AnyPublisher<[Pub.Output], Pub.Failure> where Pub: Publisher {
guard !pubs.isEmpty else { return Empty().eraseToAnyPublisher() }
return pubs.dropFirst().reduce(pubs[0].map { [$0] }.eraseToAnyPublisher()) { partial, next in
partial.combineLatest(next)
.map { $0 [$1] }
.eraseToAnyPublisher()
}
}
It can be used like this:
combineLatest(stringPublishers).sink(receiveValue: {
print($0)
})
.store(in: &cancellables)
If you don't like free functions, you can put it in an extension:
extension Collection where Element: Publisher {
func combineLatest() -> AnyPublisher<[Element.Output], Element.Failure> {
guard !isEmpty else { return Empty().eraseToAnyPublisher() }
return dropFirst().reduce(first!.map { [$0] }.eraseToAnyPublisher()) { partial, next in
partial.combineLatest(next)
.map { $0 [$1] }
.eraseToAnyPublisher()
}
}
}