i'm on my journey to deepen my knowledge in fs2, and want to try fs2-kafka for a use case where i would replace akka stream. The idea is simple, read from kafka and post data via http request to a sink, then commit back to kafka on success. So far i can't really figure out the http part. In akka stream / akka http you have out of the box a flow for that https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-connection-pool
Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]
Which integrate flawlessly with akka stream.
I was trying to see if i could do something similar with http4s and fs2 .
Does anyone has any reference, code sample, blog and what not that shows how to do that kind of integration. So far the only thing i could think of was, wrapping the the stream into the use method of the client resource i.e
BlazeClientBuilder[IO](IORuntime.global.compute).resource.use { ..... run stream here ..... }
Even then i am not sure of the entire thing
CodePudding user response:
The thing with the typelevel ecosystem is that everything is just a library, you don't need examples on how many of them interact together, you just need to understand how each library works and the basic rules of composition.
def createClient(/** whatever arguments you need */): Resource[IO, Client[IO]] = {
// Fill this based on the documentation of the client of your choice:
// I would recommend the ember client from http4s:
// https://http4s.org/v0.23/api/org/http4s/ember/client/emberclientbuilder
}
def sendHttpRequest(client: Client[IO])(data: Data): IO[Result] = {
// Fill this based on the documentation of your client:
// https://http4s.org/v0.23/client/
// https://http4s.org/v0.23/api/org/http4s/client/client
}
def getStreamOfRecords(/** whatever arguments you need */): Stream[IO, CommittableConsumerRecord[IO, Key, Data]] = {
// Fill this based on the documentation of fs2-kafka:
// https://fd4s.github.io/fs2-kafka/docs/consumers
}
def program(/** whatever arguments you need */): Stream[IO, Unit] = {
// Based on the documentation of fs2 and fs2-kafka I would guess something like this:
Stream.fromResource(createClient(...)).flatMap { client =>
getStreamOfRecords(...).evalMapFilter { committable =>
sendHttpRequest(client)(data = committable.record).map { result =>
if (result.isSuccess) Some(committable.offset)
else None
}
}.through(commitBatchWithin(...))
}
}
object Main extends IOApp.Simple {
override final val run: IO[Unit] =
program(...).compile.drain
}
Note that I wrote all this on top of my head and with just a quick glimpse of the documentation, you need to change many things (especially types, like Data
& Result
). As well as tunning things like error handling and when to commit back to Kafka.
However, I expect this helps you to get an idea of how to structure your code.