I'm trying to test out some async stream but I'm not sure I am understanding them correctly. I have the following two example functions:
func foo() async throws {
print("Start")
for await data in bar() {
print(data)
}
print("End")
}
private func bar() -> AsyncStream<String> {
return AsyncStream { continuation in
print("Stream Started")
for count in 0...2 {
sleep(1)
print("Yielding...")
continuation.yield("\(count)")
}
continuation.finish()
}
}
I would have expected this to print something like this:
Start
Stream Started
Yielding...
0
Yielding...
1
Yielding...
2
End
However what I am seeing is:
Start
Stream Started
Yielding...
Yielding...
Yielding...
0
1
2
End
Is my expectation wrong here on how this stream should work?
CodePudding user response:
There are 2 kinds of sleep
, one for the thread (you are using this one)
https://developer.apple.com/documentation/foundation/thread/1413673-sleep
and the one that should be used in concurrency
https://developer.apple.com/documentation/swift/task/sleep(_:)
private func bar() -> AsyncStream<String> {
return AsyncStream { continuation in
let task = Task{
print("Stream Started")
for count in 0...2 {
//Concurrency version of sleeping
try await Task.sleep(for: .seconds(1))
print("Yielding...")
continuation.yield("\(count)")
}
continuation.finish()
}
continuation.onTermination = { _ in
//Make sure you cancel the task if the stream is terminated
task.cancel()
}
}
}
Concurrency isn't directly related to threads, it is more about "Actors"
https://developer.apple.com/wwdc21/10132
CodePudding user response:
Issue was using sleep(1)
instead of try await Task.sleep(nanoseconds: 1_000_000_000)
CodePudding user response:
You need to use the proper sleep and you need to detach the task. There is sample code in the doc comment associated with the constructor that does almost what your sample code does. Refer to it...
Something like:
func foo() async throws {
print("Start")
for await data in bar() {
print(data)
}
print("End")
}
func bar() -> AsyncStream<String> {
return AsyncStream { continuation in
print("Stream Started")
Task.detached {
for count in 0...2 {
try await Task.sleep(nanoseconds: 1_000_000_000)
print("Yielding...")
continuation.yield("\(count)")
}
continuation.finish()
}
}
}