import org.slf4j.LoggerFactory
import zio.blocking.Blocking
import zio.clock.Clock
import zio.console.{Console, putStrLn}
import zio.kafka.consumer.{CommittableRecord, Consumer, ConsumerSettings, Subscription}
import zio.kafka.consumer.Consumer.{AutoOffsetStrategy, OffsetRetrieval}
import zio.kafka.serde.Serde
import zio.stream.ZStream
import zio.{ExitCode, Has, URIO, ZIO, ZLayer}
object Test2Topics extends zio.App {
val logger = LoggerFactory.getLogger(this.getClass)
val consumerSettings: ConsumerSettings =
ConsumerSettings(List("localhost:9092"))
.withGroupId(s"consumer-${java.util.UUID.randomUUID().toString}")
.withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
val consumer: ZLayer[Clock with Blocking, Throwable, Has[Consumer]] =
ZLayer.fromManaged(Consumer.make(consumerSettings))
val streamString: ZStream[Any with Has[Consumer], Throwable, CommittableRecord[String, String]] =
Consumer.subscribeAnd(Subscription.topics("test"))
.plainStream(Serde.string, Serde.string)
val streamInt: ZStream[Any with Has[Consumer], Throwable, CommittableRecord[String, String]] =
Consumer.subscribeAnd(Subscription.topics("topic"))
.plainStream(Serde.string, Serde.string)
val combined = streamString.zipWithLatest(streamInt)((a,b)=>(a,b))
val program = for {
fiber1 <- streamInt.tap(r => putStrLn(s"streamInt: ${r.toString}")).runDrain.forkDaemon
fiber2 <- streamString.tap(r => putStrLn(s"streamString: ${r.toString}")).runDrain.forkDaemon
} yield ZIO.raceAll(fiber1.join, List(fiber2.join))
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
//combined.tap(r => putStrLn(s"Combined: ${r.toString}")).runDrain.provideSomeLayer(consumer Console.live).exitCode
program.provideSomeLayer(consumer Console.live).exitCode
}
}
Somehow when i try to combine the output from two topics with names test and topic i dont get any output printed out, and also when i try to print both streams in parallel that also doesnt work, but if i print just one stream at a time it works.
Did anyone experience anything like this?
CodePudding user response:
You are composing 1 shared layer that provides one instance of a consumer and initialize this instance twice after eachother to subscribe to 2 topics one after the other. A single consumer instance should only be initialized once, so the above code will never work.
I believe setting up 2 independent compositions of consumer to stream like this will help:
val program = for {
fiber1 <- streamInt.tap(r => putStrLn(s"streamInt: ${r.toString}")).runDrain.forkDaemon.provideSomeLayer(consumer)
fiber2 <- streamString.tap(r => putStrLn(s"streamString: ${r.toString}")).runDrain.forkDaemon.provideSomeLayer(consumer)
} yield {...}