Home > Software design >  Swift Combine MergeMany Publishers
Swift Combine MergeMany Publishers

Time:06-30

I have a function that that builds several Publishers and returns them all in a single Publisher with MergeMany. The problem is that some users might have a LOT of endpoints in this publisher, and hitting all these endpoints at once frequently results in server timeouts. Is there a way to limit the concurrent network requests (like DispatchSemaphore) in Combine?

let mergedPubs = Publishers.MergeMany(urlRequests.map { dataTaskPublisher(for: $0)
            .decode(type: RawJSON.self, decoder: JSONDecoder())
            .mapError { _ in
                return URLError(URLError.Code.badServerResponse)
            }
    })
        .collect()
        .eraseToAnyPublisher()

CodePudding user response:

There's no out of the box solution offered by Combine for this but we can build it on top of existing Publishers.MergeMany and Publishers.Concatenate.

The idea is:

  • Divide the input array in chunks of max concurrent requests. Eg. using simple Int array [1, 2, 3, 4, 5, 6] and maxConcurrent = 3 we will have [[1, 2, 3], [4, 5, 6]] so requests 1, 2, 3 will be performed in parallel but 4, 5, 6 will only start when the previous chunk completes.
  • Use Publishers.MergeMany on these subarrays. So we will get [Publisher([1, 2, 3]), Publisher([4, 5, 6])]
  • Concatenate each new publisher coming out from the previous step.

To achieve this we need to essentially implement Publishers.ConcatenateMany leveraging Publishers.Concatenate that takes only 2 input streams. If you want to follow Combine style this should be implemented in a brand new struct but I implemented this in a static func for now.

extension Publishers {
  static func concatenateMany<Output, Failure>(_ publishers: [AnyPublisher<Output, Failure>]) -> AnyPublisher<Output, Failure> {
    return publishers.reduce(Empty().eraseToAnyPublisher()) { acc, elem in
      Publishers.Concatenate(prefix: acc, suffix: elem).eraseToAnyPublisher()
    }
  }

Utility to chunk an Array:

extension Array {
  func chunked(into size: Int) -> [[Element]] {
    return stride(from: 0, to: count, by: size).map {
      Array(self[$0 ..< Swift.min($0   size, count)])
    }
  }
}

And we can now implement a new version of MergeMany that takes also a maxConcurrent parameter.

extension Publishers {
  static func mergeMany<Output, Failure>(maxConcurrent: Int, _ publishers: [AnyPublisher<Output, Failure>]) -> AnyPublisher<Output, Failure> {
    return Publishers.concatenateMany(
      publishers.chunked(into: maxConcurrent)
        .map {
          Publishers.MergeMany($0)
            .eraseToAnyPublisher()
        }
    )
  }
}

And finally your code would look like this:

let mergedPubs = Publishers.mergeMany(maxConcurrent: 3, requests)
  .collect()
  .eraseToAnyPublisher()

This is just one idea and there might be other ways to achieve the same result!

  • Related