Home > Software design >  How to combine data from two kafka topics ZStreams to one ZStream?
How to combine data from two kafka topics ZStreams to one ZStream?

Time:07-07

import org.slf4j.LoggerFactory
import zio.blocking.Blocking
import zio.clock.Clock
import zio.console.{Console, putStrLn}
import zio.kafka.consumer.{CommittableRecord, Consumer, ConsumerSettings, Subscription}
import zio.kafka.consumer.Consumer.{AutoOffsetStrategy, OffsetRetrieval}
import zio.kafka.serde.Serde
import zio.stream.ZStream
import zio.{ExitCode, Has, URIO, ZIO, ZLayer}

object Test2Topics extends zio.App {
  val logger = LoggerFactory.getLogger(this.getClass)

  val consumerSettings: ConsumerSettings =
    ConsumerSettings(List("localhost:9092"))
      .withGroupId(s"consumer-${java.util.UUID.randomUUID().toString}")
      .withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))

  val consumer: ZLayer[Clock with Blocking, Throwable, Has[Consumer]] =
    ZLayer.fromManaged(Consumer.make(consumerSettings))

  val streamString: ZStream[Any with Has[Consumer], Throwable, CommittableRecord[String, String]] =
    Consumer.subscribeAnd(Subscription.topics("test"))
      .plainStream(Serde.string, Serde.string)

  val streamInt: ZStream[Any with Has[Consumer], Throwable, CommittableRecord[String, String]] =
    Consumer.subscribeAnd(Subscription.topics("topic"))
      .plainStream(Serde.string, Serde.string)
  val combined = streamString.zipWithLatest(streamInt)((a,b)=>(a,b))
  val program = for {
    fiber1 <- streamInt.tap(r => putStrLn(s"streamInt: ${r.toString}")).runDrain.forkDaemon
    fiber2 <- streamString.tap(r => putStrLn(s"streamString: ${r.toString}")).runDrain.forkDaemon
  } yield ZIO.raceAll(fiber1.join, List(fiber2.join))
  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
    //combined.tap(r => putStrLn(s"Combined: ${r.toString}")).runDrain.provideSomeLayer(consumer    Console.live).exitCode
    program.provideSomeLayer(consumer    Console.live).exitCode
  }
}

Somehow when i try to combine the output from two topics with names test and topic i dont get any output printed out, and also when i try to print both streams in parallel that also doesnt work, but if i print just one stream at a time it works.

Did anyone experience anything like this?

CodePudding user response:

You are composing 1 shared layer that provides one instance of a consumer and initialize this instance twice after eachother to subscribe to 2 topics one after the other. A single consumer instance should only be initialized once, so the above code will never work.

I believe setting up 2 independent compositions of consumer to stream like this will help:

val program = for {
    fiber1 <- streamInt.tap(r => putStrLn(s"streamInt: ${r.toString}")).runDrain.forkDaemon.provideSomeLayer(consumer)
    fiber2 <- streamString.tap(r => putStrLn(s"streamString: ${r.toString}")).runDrain.forkDaemon.provideSomeLayer(consumer)
  } yield {...}
  • Related