Home > database >  A journey from akka-stream to fs2 - how to define an akka-stream http flow like stage in fs2 using h
A journey from akka-stream to fs2 - how to define an akka-stream http flow like stage in fs2 using h

Time:10-10

i'm on my journey to deepen my knowledge in fs2, and want to try fs2-kafka for a use case where i would replace akka stream. The idea is simple, read from kafka and post data via http request to a sink, then commit back to kafka on success. So far i can't really figure out the http part. In akka stream / akka http you have out of the box a flow for that https://doc.akka.io/docs/akka-http/current/client-side/host-level.html#using-a-host-connection-pool

Flow[(HttpRequest, T), (Try[HttpResponse], T), HostConnectionPool]

Which integrate flawlessly with akka stream.

I was trying to see if i could do something similar with http4s and fs2 .

Does anyone has any reference, code sample, blog and what not that shows how to do that kind of integration. So far the only thing i could think of was, wrapping the the stream into the use method of the client resource i.e

BlazeClientBuilder[IO](IORuntime.global.compute).resource.use { ..... run stream here ..... }

Even then i am not sure of the entire thing

CodePudding user response:

The thing with the typelevel ecosystem is that everything is just a library, you don't need examples on how many of them interact together, you just need to understand how each library works and the basic rules of composition.

def createClient(/** whatever arguments you need */): Resource[IO, Client[IO]] = {
  // Fill this based on the documentation of the client of your choice:
  // I would recommend the ember client from http4s:
  // https://http4s.org/v0.23/api/org/http4s/ember/client/emberclientbuilder 
}


def sendHttpRequest(client: Client[IO])(data: Data): IO[Result] = {
  // Fill this based on the documentation of your client:
  // https://http4s.org/v0.23/client/
  // https://http4s.org/v0.23/api/org/http4s/client/client
}

def getStreamOfRecords(/** whatever arguments you need */): Stream[IO, CommittableConsumerRecord[IO, Key, Data]] = {
  // Fill this based on the documentation of fs2-kafka:
  // https://fd4s.github.io/fs2-kafka/docs/consumers
}

def program(/** whatever arguments you need */): Stream[IO, Unit] = {
  // Based on the documentation of fs2 and fs2-kafka I would guess something like this:
  Stream.fromResource(createClient(...)).flatMap { client =>
    getStreamOfRecords(...).evalMapFilter { committable =>
      sendHttpRequest(client)(data = committable.record).map { result =>
        if (result.isSuccess) Some(committable.offset)
        else None
      }
    }.through(commitBatchWithin(...))
  }
}

object Main extends IOApp.Simple {
  override final val run: IO[Unit] =
    program(...).compile.drain
}

Note that I wrote all this on top of my head and with just a quick glimpse of the documentation, you need to change many things (especially types, like Data & Result). As well as tunning things like error handling and when to commit back to Kafka.
However, I expect this helps you to get an idea of how to structure your code.

  • Related