I'm trying to convert some combine friendly code to async/await. For this post I'll use a simple example cut down from some real code just to keep the discussion simple.
import Combine
import Foundation
enum State {
case a
case b
case c
}
class StateMachine: Publisher {
typealias Output = State
typealias Failure = Error
private let currentState = CurrentValueSubject<State, Error>(State.a)
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
currentState.receive(subscriber: subscriber)
}
func gotoB() { currentState.value = .b }
func gotoC() { currentState.value = .c }
}
let sm = StateMachine()
let c = sm.sink {
print("Failed with \($0)")
}
receiveValue: {
print("New value \($0)")
}
sm.gotoB()
sm.gotoC()
So in this code (which works in a playground) I want the StateMachine
to act as a publisher so it can be subscribed to directly. To achieve this I use an internal CurrentValueSubject
and forward the publisher's receive(subscriber:)
to it.
However when I change class StateMachine:
to actor StateMachine:
actor StateMachine: Publisher {
//...
}
// ...
await sm.gotoB()
await.sm.gotoC()
this code no longer compiles and throws these errors:
expression failed to parse:
error: SwiftFormat.playground:18:10: error: actor-isolated instance method 'receive(subscriber:)' cannot be used to satisfy nonisolated protocol requirement
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
^
SwiftFormat.playground:18:10: note: add 'nonisolated' to 'receive(subscriber:)' to make this instance method not isolated to the actor
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input {
^
nonisolated
Combine.Publisher:5:10: note: 'receive(subscriber:)' declared here
func receive<S>(subscriber: S) where S : Subscriber, Self.Failure == S.Failure, Self.Output == S.Input
^
Now I can add nonisolated
as suggested, but I'm not sure that isn't going to break the concurrency handling that actor
is introducing because there's nothing to stop concurrent access to the internal subject.
So is there a way to keep the type as a Combine Publisher
or is there some other async/away approach I've just not read about which is now the preferred approach?
CodePudding user response:
The first thing to ask here is "what is it that is asynchronous?"
I don't think in this case that the gotoA
or gotoB
are asynchronous. The thing that is asynchronous is the subscription to the current value.
The consumers will subscribe to it and then at some point later in time it will change and they are updated.
In async await this can be modelled as an AsyncStream
.
To you an async stream you access it something like:
for await state in stateMachine.values {
print("received value \(state)")
}
So to make this work we need to make stateMachine
give out an AsyncStream
.
Something like this would work...
enum State {
case a
case b
case c
}
class StateMachine {
var subscriptions: [UUID: AsyncStream<State>.Continuation] = [:] // 1.
var currentState = State.a {
didSet {
subscriptions.allValues.forEach {
$0.yield(currentState)
}
}
} // 2.
var values: AsyncStream<State> {
AsyncStream { continuation in
let id = UUID()
subscriptions[id] = continuation
continuation.yield(currentValue)
continuation.onTermination = { [weak self] _ in
self?.subscriptions.removeValue(forKey: id)
}
}
} // 3.
func gotoB() {
currentState = .b
}
}
In this we...
- Create a place to store any incoming subscriptions
- Make sure that when current state changes we update all the subscribers
- When a new subscriber is added we give it an AsyncStream to listen to
This is the beginnings of what you would need to create. You might need to use some sort of Singleton
access to this if you have multiple places in your app that need to subscribe to the same state.
But this should give you the initial idea. We use something very similar to this in the app I am currently working on. Sometimes to create a subscribable cache. Sometimes to respond to error messages, etc...