Home > Software design >  Swift Combine collect values until last page is found or until time expires
Swift Combine collect values until last page is found or until time expires

Time:10-19

So I want to collect values until I see the last page, but if the last page never comes I want to send what we have, given a time limit.

I have a way of doing this but it seems rather wasteful. I'm going to be using this to make collections that may have hundreds of thousands of values, so a more space-efficient method would be preferred.

You can copy and paste the following into a playground

import UIKit
import Foundation
import Combine

var subj = PassthroughSubject<String, Never>()

let queue = DispatchQueue(label: "Test")

let collectForTime = subj
    .collect(.byTime(queue, .seconds(10)))

let collectUntilLast = subj
  .scan([String]()) { $0   [$1] }
  .first { $0.last == "LastPage" }

// whichever happens first
let cancel = collectForTime.merge(with: collectUntilLast)
    .first()
    .sink {
        print("complete1: \($0)")
    } receiveValue: {
        print("received1: \($0)")
    }

print("start")

let strings = [
    "!@#$",
    "ZXCV",
    "LastPage", // comment this line to test to see what happens if no last page is sent
    "ASDF",
    "JKL:"
]
// if the last page is present then the items 0..<3 will be sent
// if there's no last page then send what we have
// the main thing is that the system is not just sitting there waiting for a last page that never comes.

for i in (0..<strings.count) {
    DispatchQueue.main.asyncAfter(deadline: .now()   .seconds(i)) {
        let s = strings[i]
        print("sending \(s)")
        subj.send(s)
    }
}

CodePudding user response:

I made a small change to your technique

import Foundation
import Combine

var subj = PassthroughSubject<String, Never>()

let lastOrTimeout = subj
    .timeout(.seconds(10), scheduler: RunLoop.main )
    .print("watchdog")
    .first { $0 == "LastPage" }
    .append(Just("Done"))

let cancel = subj
    .prefix(untilOutputFrom: lastOrTimeout)
    .print("main_publisher")
    .collect()
    .sink {
        print("complete1: \($0)")
    } receiveValue: {
        print("received1: \($0)")
    }

print("start")

let strings = [
    "!@#$",
    "ZXCV",
    "LastPage", // comment this line to test to see what happens if no last page is sent
    "ASDF",
    "JKL:"
]
// if the last page is present then the items 0..<3 will be sent
// if there's no last page then send what we have
// the main thing is that the system is not just sitting there waiting for a last page that never comes.

strings.enumerated().forEach { index, string in
    DispatchQueue.main.asyncAfter(deadline: .now()   .seconds(index)) {
        print("sending \(string)")
        subj.send(string)
    }
}

lastOrTimeout will emit a value when it see's LastPage or finishes because of a timeout (and emits Done).

The main pipeline collects values until the watchdog publisher emits a value and collects all the results.

CodePudding user response:

UPDATE

After playing a bit more with the playground I think all you need is:

subj
  .prefix { $0 != "LastPage" }
  .append("LastPage")
  .collect(.byTime(DispatchQueue.main, .seconds(10)))

I wouldn't use collect because under the hood it is basically doing the same thing that scan is doing, you only need another condition in the first closure eg: .first { $0.last == "LastPage" || timedOut } to emit the collected items in case of timeout.

It's unfortunate that collect doesn't offer the API you need but we can create another version of it. The idea is to combineLatest the scan output with a stream that emits a Bool after a deadline (In reality we also need to emit false initially for combineLatest to start) and || this additional variable inside filter condition.

Here is the code:

extension Publisher {

  func collect<S: Scheduler>(
    timeoutAfter interval: S.SchedulerTimeType.Stride,
    scheduler: S,
    orWhere predicate: @escaping ([Output]) -> Bool
  ) -> AnyPublisher<[Output], Failure> {
    scan([Output]()) { $0   [$1] }
      .combineLatest(
        Just(true)
          .delay(for: interval, scheduler: scheduler)
          .prepend(false)
          .setFailureType(to: Failure.self)
      )
      .first { predicate($0) || $1 }
      .map(\.0)
      .eraseToAnyPublisher()
  }

}

let subj = PassthroughSubject<String, Never>()

let cancel = subj
  .collect(
    timeoutAfter: .seconds(10),
    scheduler: DispatchQueue.main,
    orWhere: { $0.last == "LastPage" }
  )
  .print()
  .sink { _ in }
  • Related