How to convert a Source to Flow?
Input: Source[ByteString,NotUsed] a Intermediary Step: Call an API which returns an InputStream Output: Flow[ByteString,ByteString,NotUsed]
I am doing it as: Type of input is = Source[ByteString,NotUsed]
val sink: Sink[ByteString,InputStream] = StreamConverters.asInputStream()
val output: InputStream = <API CALL>
val mySource: Source[ByteString,Future[IOResult]] = StreamConverters.fromInputStream(() => output)
val myFlow: Flow[ByteString,ByteString,NotUsed] = Flow.fromSinkAndSource(sink,source)
When I use the above Flow in the source it returns an empty result. Can someone help me figure out of I am doing it right?
CodePudding user response:
I'm not sure tu fully grasp what you want to achieve but maybe this is a use case for flatMapConcat
:
def readInputstream(bs: ByteString): Source[ByteString, Future[IOResult]] =
// Get some IS from the ByteString
StreamConverters.fromInputStream(() => ???)
val myFlow: Flow[ByteString, ByteString, NotUsed] =
Flow.flatMapConcat(bs => readInputstream(bs))
// And use it like this:
val source: Source[ByteString] = ???
source
.via(myFlow)
.to(???)