Home > Mobile >  Swift combine type as Publisher to async/await
Swift combine type as Publisher to async/await

Time:11-29

I'm trying to convert some combine friendly code to async/await. For this post I'll use a simple example cut down from some real code just to keep the discussion simple.


import Combine
import Foundation

enum State {
    case a
    case b
    case c
}

class StateMachine: Publisher {

    typealias Output = State
    typealias Failure = Error

    private let currentState = CurrentValueSubject<State, Error>(State.a)

    func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
        currentState.receive(subscriber: subscriber)
    }

    func gotoB() { currentState.value = .b }
    func gotoC() { currentState.value = .c }
}

let sm = StateMachine()
let c = sm.sink {
    print("Failed with \($0)")
}
receiveValue: {
    print("New value \($0)")
}

sm.gotoB()
sm.gotoC()

So in this code (which works in a playground) I want the StateMachine to act as a publisher so it can be subscribed to directly. To achieve this I use an internal CurrentValueSubject and forward the publisher's receive(subscriber:) to it.

However when I change class StateMachine: to actor StateMachine:

actor StateMachine: Publisher {
    //...
}

// ...
await sm.gotoB()
await.sm.gotoC()

this code no longer compiles and throws these errors:

expression failed to parse:
error: SwiftFormat.playground:18:10: error: actor-isolated instance method 'receive(subscriber:)' cannot be used to satisfy nonisolated protocol requirement
    func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
         ^

SwiftFormat.playground:18:10: note: add 'nonisolated' to 'receive(subscriber:)' to make this instance method not isolated to the actor
    func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
         ^
    nonisolated 

Combine.Publisher:5:10: note: 'receive(subscriber:)' declared here
    func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
         ^

Now I can add nonisolated as suggested, but I'm not sure that isn't going to break the concurrency handling that actor is introducing because there's nothing to stop concurrent access to the internal subject.

So is there a way to keep the type as a Combine Publisher or is there some other async/away approach I've just not read about which is now the preferred approach?

CodePudding user response:

The first thing to ask here is "what is it that is asynchronous?"

I don't think in this case that the gotoA or gotoB are asynchronous. The thing that is asynchronous is the subscription to the current value.

The consumers will subscribe to it and then at some point later in time it will change and they are updated.

In async await this can be modelled as an AsyncStream.

To you an async stream you access it something like:

for await state in stateMachine.values {
  print("received value \(state)")
}

So to make this work we need to make stateMachine give out an AsyncStream.

Something like this would work...

enum State {
    case a
    case b
    case c
}

class StateMachine {
    var subscriptions: [UUID: AsyncStream<State>.Continuation] = [:] // 1.

    var currentState = State.a {
        didSet {
            subscriptions.allValues.forEach {
                $0.yield(currentState)
            }
        }
    } // 2.

    var values: AsyncStream<State> {
        AsyncStream { continuation in
            let id = UUID()
            subscriptions[id] = continuation

            continuation.yield(currentValue)

            continuation.onTermination = { [weak self] _ in
                self?.subscriptions.removeValue(forKey: id)
            }
        }
    } // 3.

    func gotoB() {
        currentState = .b
    }
}

In this we...

  1. Create a place to store any incoming subscriptions
  2. Make sure that when current state changes we update all the subscribers
  3. When a new subscriber is added we give it an AsyncStream to listen to

This is the beginnings of what you would need to create. You might need to use some sort of Singleton access to this if you have multiple places in your app that need to subscribe to the same state.

But this should give you the initial idea. We use something very similar to this in the app I am currently working on. Sometimes to create a subscribable cache. Sometimes to respond to error messages, etc...

  • Related