Home > Software design >  How to combine data from two kafka topics ZStreams to one ZStream?
How to combine data from two kafka topics ZStreams to one ZStream?


import org.slf4j.LoggerFactory
import zio.blocking.Blocking
import zio.clock.Clock
import zio.console.{Console, putStrLn}
import zio.kafka.consumer.{CommittableRecord, Consumer, ConsumerSettings, Subscription}
import zio.kafka.consumer.Consumer.{AutoOffsetStrategy, OffsetRetrieval}
import zio.kafka.serde.Serde
import zio.stream.ZStream
import zio.{ExitCode, Has, URIO, ZIO, ZLayer}

object Test2Topics extends zio.App {
  val logger = LoggerFactory.getLogger(this.getClass)

  val consumerSettings: ConsumerSettings =

  val consumer: ZLayer[Clock with Blocking, Throwable, Has[Consumer]] =

  val streamString: ZStream[Any with Has[Consumer], Throwable, CommittableRecord[String, String]] =
      .plainStream(Serde.string, Serde.string)

  val streamInt: ZStream[Any with Has[Consumer], Throwable, CommittableRecord[String, String]] =
      .plainStream(Serde.string, Serde.string)
  val combined = streamString.zipWithLatest(streamInt)((a,b)=>(a,b))
  val program = for {
    fiber1 <- streamInt.tap(r => putStrLn(s"streamInt: ${r.toString}")).runDrain.forkDaemon
    fiber2 <- streamString.tap(r => putStrLn(s"streamString: ${r.toString}")).runDrain.forkDaemon
  } yield ZIO.raceAll(fiber1.join, List(fiber2.join))
  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
    //combined.tap(r => putStrLn(s"Combined: ${r.toString}")).runDrain.provideSomeLayer(consumer    Console.live).exitCode
    program.provideSomeLayer(consumer    Console.live).exitCode

Somehow when i try to combine the output from two topics with names test and topic i dont get any output printed out, and also when i try to print both streams in parallel that also doesnt work, but if i print just one stream at a time it works.

Did anyone experience anything like this?

CodePudding user response:

You are composing 1 shared layer that provides one instance of a consumer and initialize this instance twice after eachother to subscribe to 2 topics one after the other. A single consumer instance should only be initialized once, so the above code will never work.

I believe setting up 2 independent compositions of consumer to stream like this will help:

val program = for {
    fiber1 <- streamInt.tap(r => putStrLn(s"streamInt: ${r.toString}")).runDrain.forkDaemon.provideSomeLayer(consumer)
    fiber2 <- streamString.tap(r => putStrLn(s"streamString: ${r.toString}")).runDrain.forkDaemon.provideSomeLayer(consumer)
  } yield {...}
  • Related