Home > front end >  AKKA, Kafka Source to Kinesis Sink using KPL
AKKA, Kafka Source to Kinesis Sink using KPL

Time:09-28

I'm trying to use AKKA to consume from a kafka topic and send to a Kinesis Stream using KPL.

I've created a source for the kafka topic.

ActorSystem actorSystem = ActorSystemFactory.getClassicActorSystem();

KafkaService kafkaService = new kafkaService(actorSystem);
Source<Message, Consumer.Control> kafkaSource = kakfaService.createSource(runtimeConfiguration.getSourceKafkaTopic());

Now I would like to send the records to a Kinesis Stream using some type of sink. The issue with the "native" akka kinesis sink is that it does not support KPL. I'm trying to use this dependency

<!-- https://mvnrepository.com/artifact/com.github.j5ik2o/akka-kinesis-kpl -->
<dependency>
    <groupId>com.github.j5ik2o</groupId>
    <artifactId>akka-kinesis-kpl_2.13</artifactId>
    <version>1.0.252</version>
</dependency>

But it only has three classes could be relevant to my purpose. KPLFlow, KPLFlowSettings and KPLFlowStage.

I've never used Flows in akka, is there a way to create a sink from the flow, or at most be able with the flow to setup a way to send to the correct kinesis stream?

The KPLFlowStage

extends GraphStageWithMaterializedValue[FlowShape[UserRecord, UserRecordResult], Future[KinesisProducer]]

From messing around with it, looks like KPLFlow is expecting UserRecord, that would mean the record is already prepared to send to a kinesis stream.

CodePudding user response:

No experience with Kinesis, but you can turn a Flow<In, Out, Mat> (e.g. a Flow<UserRecord, UserRecordResult, Future<KinesisProducer>>) into a Sink<In, Pair<Out, CompletionStage<Done>> with

Sink<UserRecord, Pair<scala.concurrent.Future<KinesisProducer>, CompletionStage<Done>>> sink =
    flow.toMat(Sink.ignore(), Keep.both())

Since the KPLFlowStage appears to materialize as a Scala Future and pulls in Scala 2.13's library, this should give you a pair of CompletionStages:

import scala.jdk.javaapi.FutureConverters

Sink<UserRecord, Pair<CompletionStage<KinesisProducer>, CompletionStage<Done>>> sink =
    flow.mapMaterializedValue(scalaFuture -> FutureConverters.asJava(scalaFuture))
        .toMat(Sink.ignore(), Keep.both())
  • Related