I have a function that that builds several Publishers and returns them all in a single Publisher with MergeMany. The problem is that some users might have a LOT of endpoints in this publisher, and hitting all these endpoints at once frequently results in server timeouts. Is there a way to limit the concurrent network requests (like DispatchSemaphore) in Combine?
let mergedPubs = Publishers.MergeMany(urlRequests.map { dataTaskPublisher(for: $0)
.decode(type: RawJSON.self, decoder: JSONDecoder())
.mapError { _ in
return URLError(URLError.Code.badServerResponse)
}
})
.collect()
.eraseToAnyPublisher()
CodePudding user response:
There's no out of the box solution offered by Combine for this but we can build it on top of existing Publishers.MergeMany
and Publishers.Concatenate
.
The idea is:
- Divide the input array in chunks of max concurrent requests. Eg. using simple
Int
array[1, 2, 3, 4, 5, 6]
andmaxConcurrent = 3
we will have[[1, 2, 3], [4, 5, 6]]
so requests 1, 2, 3 will be performed in parallel but 4, 5, 6 will only start when the previous chunk completes. - Use
Publishers.MergeMany
on these subarrays. So we will get[Publisher([1, 2, 3]), Publisher([4, 5, 6])]
- Concatenate each new publisher coming out from the previous step.
To achieve this we need to essentially implement Publishers.ConcatenateMany
leveraging Publishers.Concatenate
that takes only 2 input streams. If you want to follow Combine style this should be implemented in a brand new struct but I implemented this in a static func for now.
extension Publishers {
static func concatenateMany<Output, Failure>(_ publishers: [AnyPublisher<Output, Failure>]) -> AnyPublisher<Output, Failure> {
return publishers.reduce(Empty().eraseToAnyPublisher()) { acc, elem in
Publishers.Concatenate(prefix: acc, suffix: elem).eraseToAnyPublisher()
}
}
Utility to chunk an Array:
extension Array {
func chunked(into size: Int) -> [[Element]] {
return stride(from: 0, to: count, by: size).map {
Array(self[$0 ..< Swift.min($0 size, count)])
}
}
}
And we can now implement a new version of MergeMany
that takes also a maxConcurrent
parameter.
extension Publishers {
static func mergeMany<Output, Failure>(maxConcurrent: Int, _ publishers: [AnyPublisher<Output, Failure>]) -> AnyPublisher<Output, Failure> {
return Publishers.concatenateMany(
publishers.chunked(into: maxConcurrent)
.map {
Publishers.MergeMany($0)
.eraseToAnyPublisher()
}
)
}
}
And finally your code would look like this:
let mergedPubs = Publishers.mergeMany(maxConcurrent: 3, requests)
.collect()
.eraseToAnyPublisher()
This is just one idea and there might be other ways to achieve the same result!