Home > Enterprise >  Shorthand for calling Concurrency within Combine sink?
Shorthand for calling Concurrency within Combine sink?

Time:10-09

I'm subscribing to Combine publishers, but frequently calling Concurrent tasks within the .sink. Is there a more convenient way to do this?

import _Concurrency
import Combine
import Foundation
import PlaygroundSupport

var cancellable = Set<AnyCancellable>()

struct MyService {
    private static let subject = PassthroughSubject<String, Never>()

    init() {
        Task {
            try? await Task.sleep(until: .now   .seconds(2), clock: .suspending)
            Self.subject.send("Publisher: "   Date.now.formatted())
        }
    }

    func publisher() -> AnyPublisher<String, Never> {
        Self.subject.eraseToAnyPublisher()
    }
}

class MyClass {
    let service: MyService
    var cancellable: AnyCancellable?

    init() {
        service = MyService()
        subscribe()
    }

    func subscribe() {
        // HERE ===>
        cancellable = service.publisher()
            .sink { value in Task { [weak self] in await self?.doThings(value: value) } }
    }

    func doThings(value: String) async {
        print(value)
        try? await Task.sleep(until: .now   .seconds(2), clock: .suspending)
        print("Things done!")
    }
}


let test = MyClass()


PlaygroundPage.current.needsIndefiniteExecution = true

What I'd like to do is seamlessly call Concurrency tasks in the .sink something like:

// From:
service.publisher()
    .sink { value in Task { [weak self] in await self?.doThings(value: value) }

// To:
service.publisher()
    .sink { [weak self] value in await self?.doThings(value: value) }

CodePudding user response:

You can write an extension like this:

extension Publisher where Failure == Never {
    func sinkAsync(receiveValue: @escaping ((Self.Output) async throws -> Void)) -> AnyCancellable {
        sink { value in
            Task {
                try await receiveValue(value)
            }
        }
    }
}

Notice that the closure it takes is async throws, which is the same as what the closure of Task.init has.

The "native" way of doing this, I think, would be to convert the publisher to an async sequence.

Task {
    for await value in service.publisher().values {
        await self.doThings(value: value)
    }
}

Note that by doing this, you don't need to manage the cancellable!

  • Related