I'm trying to use AKKA to consume from a kafka topic and send to a Kinesis Stream using KPL.
I've created a source for the kafka topic.
ActorSystem actorSystem = ActorSystemFactory.getClassicActorSystem();
KafkaService kafkaService = new kafkaService(actorSystem);
Source<Message, Consumer.Control> kafkaSource = kakfaService.createSource(runtimeConfiguration.getSourceKafkaTopic());
Now I would like to send the records to a Kinesis Stream using some type of sink. The issue with the "native" akka kinesis sink is that it does not support KPL. I'm trying to use this dependency
<!-- https://mvnrepository.com/artifact/com.github.j5ik2o/akka-kinesis-kpl -->
<dependency>
<groupId>com.github.j5ik2o</groupId>
<artifactId>akka-kinesis-kpl_2.13</artifactId>
<version>1.0.252</version>
</dependency>
But it only has three classes could be relevant to my purpose. KPLFlow, KPLFlowSettings and KPLFlowStage.
I've never used Flows in akka, is there a way to create a sink from the flow, or at most be able with the flow to setup a way to send to the correct kinesis stream?
The KPLFlowStage
extends GraphStageWithMaterializedValue[FlowShape[UserRecord, UserRecordResult], Future[KinesisProducer]]
From messing around with it, looks like KPLFlow is expecting UserRecord, that would mean the record is already prepared to send to a kinesis stream.
CodePudding user response:
No experience with Kinesis, but you can turn a Flow<In, Out, Mat>
(e.g. a Flow<UserRecord, UserRecordResult, Future<KinesisProducer>>
) into a Sink<In, Pair<Out, CompletionStage<Done>>
with
Sink<UserRecord, Pair<scala.concurrent.Future<KinesisProducer>, CompletionStage<Done>>> sink =
flow.toMat(Sink.ignore(), Keep.both())
Since the KPLFlowStage
appears to materialize as a Scala Future
and pulls in Scala 2.13's library, this should give you a pair of CompletionStage
s:
import scala.jdk.javaapi.FutureConverters
Sink<UserRecord, Pair<CompletionStage<KinesisProducer>, CompletionStage<Done>>> sink =
flow.mapMaterializedValue(scalaFuture -> FutureConverters.asJava(scalaFuture))
.toMat(Sink.ignore(), Keep.both())