I'm subscribing to Combine publishers, but frequently calling Concurrent tasks within the .sink
. Is there a more convenient way to do this?
import _Concurrency
import Combine
import Foundation
import PlaygroundSupport
var cancellable = Set<AnyCancellable>()
struct MyService {
private static let subject = PassthroughSubject<String, Never>()
init() {
Task {
try? await Task.sleep(until: .now .seconds(2), clock: .suspending)
Self.subject.send("Publisher: " Date.now.formatted())
}
}
func publisher() -> AnyPublisher<String, Never> {
Self.subject.eraseToAnyPublisher()
}
}
class MyClass {
let service: MyService
var cancellable: AnyCancellable?
init() {
service = MyService()
subscribe()
}
func subscribe() {
// HERE ===>
cancellable = service.publisher()
.sink { value in Task { [weak self] in await self?.doThings(value: value) } }
}
func doThings(value: String) async {
print(value)
try? await Task.sleep(until: .now .seconds(2), clock: .suspending)
print("Things done!")
}
}
let test = MyClass()
PlaygroundPage.current.needsIndefiniteExecution = true
What I'd like to do is seamlessly call Concurrency tasks in the .sink
something like:
// From:
service.publisher()
.sink { value in Task { [weak self] in await self?.doThings(value: value) }
// To:
service.publisher()
.sink { [weak self] value in await self?.doThings(value: value) }
CodePudding user response:
You can write an extension like this:
extension Publisher where Failure == Never {
func sinkAsync(receiveValue: @escaping ((Self.Output) async throws -> Void)) -> AnyCancellable {
sink { value in
Task {
try await receiveValue(value)
}
}
}
}
Notice that the closure it takes is async throws
, which is the same as what the closure of Task.init
has.
The "native" way of doing this, I think, would be to convert the publisher to an async sequence.
Task {
for await value in service.publisher().values {
await self.doThings(value: value)
}
}
Note that by doing this, you don't need to manage the cancellable!