Home > other >  Convert a Source to Flow in Scala
Convert a Source to Flow in Scala

Time:10-31

How to convert a Source to Flow?

Input: Source[ByteString,NotUsed] a Intermediary Step: Call an API which returns an InputStream Output: Flow[ByteString,ByteString,NotUsed]

I am doing it as: Type of input is = Source[ByteString,NotUsed]

val sink: Sink[ByteString,InputStream] = StreamConverters.asInputStream() 
val output: InputStream = <API CALL>    
val mySource: Source[ByteString,Future[IOResult]] = StreamConverters.fromInputStream(() => output)
val myFlow: Flow[ByteString,ByteString,NotUsed] = Flow.fromSinkAndSource(sink,source)

When I use the above Flow in the source it returns an empty result. Can someone help me figure out of I am doing it right?

CodePudding user response:

I'm not sure tu fully grasp what you want to achieve but maybe this is a use case for flatMapConcat:

def readInputstream(bs: ByteString): Source[ByteString, Future[IOResult]] =
  // Get some IS from the ByteString
  StreamConverters.fromInputStream(() => ???)

val myFlow: Flow[ByteString, ByteString, NotUsed] = 
  Flow.flatMapConcat(bs => readInputstream(bs))

// And use it like this:
val source: Source[ByteString] = ???
source
  .via(myFlow)
  .to(???)
  • Related