Home > Mobile >  Swift: Map AsyncStream into another AsyncStream
Swift: Map AsyncStream into another AsyncStream

Time:08-21

I would like to map an AsyncStream into another AsyncStream. I wonder if there is a .map that can be used just like for arrays.

Quoting from Apple documentation:

Creates an asynchronous sequence that maps the given closure over the asynchronous sequence’s elements.

To code below has an error:

Cannot convert value of type 'AsyncMapSequence<AsyncStream<Int>, Int>' to specified type 'AsyncStream<Int>'

As I understand, it is because the return type of .map in this case is AsyncMapSequence<...> instead of AsyncStream<Int>.

Is there a way to just map an AsyncStream<T1> into an AsyncStream<T2> with a transform function T1 → T2, as it works for mapping Array<T1> into Array<T2>?

Thank you in advance!

import SwiftUI

@main
struct MacosPlaygroundApp: App {
    var body: some Scene {
        WindowGroup("Playground") {
            Text("Hello World")
                .padding(100)
                .onAppear {
                    Task {
                        let numStream: AsyncStream<Int> = AsyncStream { continuation in
                            Task {
                                try await Task.sleep(nanoseconds: 1_000_000_000)
                                continuation.yield(0)
                                try await Task.sleep(nanoseconds: 1_000_000_000)
                                continuation.yield(1)
                                try await Task.sleep(nanoseconds: 1_000_000_000)
                                continuation.yield(2)
                                continuation.finish()
                            }
                        }

                        let doubleNumStream: AsyncStream<Int> = numStream.map { num in
                            return 2 * num
                        }

                        for await doubleNum in doubleNumStream {
                            print("Next num is \(doubleNum)")
                        }

                    }
                }
        }
    }
}

CodePudding user response:

You said:

Let's say I have a function, input is some async sequence of data of a certain type T, and for each such T item, it does something with it... For this function, it doesn't matter how that async sequence was calculated, e.g. whether it was mapped from another sequence or not. Ideally I would type it as AsyncSequence<T> (T being a specific type in my actual code), but AsyncSequence doesn't take type parameters.

I would suggest that you define this function to use AsyncSequence, e.g., here is a method that prints the values of the sequence:

func printSequence<S: AsyncSequence>(_ sequence: S) async throws where S.Element == Int {
    for try await value in sequence {
        print("Next num is \(value)")
    }
    print("done")
}

This will work with any AsyncSequence of Int, either the original numStream or the mapped doubleNumStream.

Then, as Sweeper said, you can just use the existing map of AsyncSequence:

Task {
    let numStream = AsyncStream<Int> { continuation in
        Task {
            try await Task.sleep(nanoseconds: 1_000_000_000)
            continuation.yield(0)
            try await Task.sleep(nanoseconds: 1_000_000_000)
            continuation.yield(1)
            try await Task.sleep(nanoseconds: 1_000_000_000)
            continuation.yield(2)
            continuation.finish()                            // don't forget to finish the sequence
        }
    }

    let doubleNumStream = numStream.map { num in             // let it just infer the type for you
        return 2 * num
    }

    try await printSequence(doubleNumStream)
}

CodePudding user response:

How about extending AsyncStream?

extension AsyncStream {
    public func map<Transformed>(_ transform: @escaping (Self.Element) -> Transformed) -> AsyncStream<Transformed> {
        return AsyncStream<Transformed> { continuation in
            Task {
                for await element in self {
                    continuation.yield(transform(element))
                }
                continuation.finish()
            }
        }
    }

    public func map<Transformed>(_ transform: @escaping (Self.Element) async -> Transformed) -> AsyncStream<Transformed> {
        return AsyncStream<Transformed> { continuation in
            Task {
                for await element in self {
                    continuation.yield(await transform(element))
                }
                continuation.finish()
            }
        }
    }
}
  • Related