I am working with a Java API from a data vendor providing real time streams. I would like to process this stream using Akka streams.
The Java API has a pub sub design and roughly works like this:
Subscription sub = createSubscription();
sub.addListener(new Listener() {
public void eventsReceived(List events) {
for (Event e : events)
buffer.enqueue(e)
}
});
I have tried to imbed the creation of this subscription and accompanying buffer in a custom graph stage without much success. Can anyone guide me on the best way to interface with this API using Akka? Is Akka Streams the best tool here?
CodePudding user response:
To feed a Source
, you don't necessarily need to use a custom graph stage. Source.queue
will materialize as a buffered queue to which you can add elements which will then propagate through the stream.
There are a couple of tricky things to be aware of. The first is that there's some subtlety around materializing the Source.queue
so you can set up the subscription. Something like this:
def bufferSize: Int = ???
Source.fromMaterializer { (mat, att) =>
val (queue, source) = Source.queue[Event](bufferSize).preMaterialize()(mat)
val subscription = createSubscription()
subscription.addListener(
new Listener() {
def eventsReceived(events: java.util.List[Event]): Unit = {
import scala.collection.JavaConverters.iterableAsScalaIterable
import akka.stream.QueueOfferResult
iterableAsScalaIterable(events).foreach { event =>
queue.offer(event) match {
case Enqueued => () // do nothing
case Dropped => ??? // handle a dropped pubsub element, might well do nothing
case QueueClosed => ??? // presumably cancel the subscription...
}
}
}
}
)
source.withAttributes(att)
}
Source.fromMaterializer
is used to get access at each materialization to the materializer (which is what compiles the stream definition into actors). When we materialize, we use the materializer to preMaterialize
the queue source so we have access to the queue. Our subscription adds incoming elements to the queue.
The API for this pubsub doesn't seem to support backpressure if the consumer can't keep up. The queue will drop elements it's been handed if the buffer is full: you'll probably want to do nothing in that case, but I've called it out in the match that you should make an explicit decision here.
Dropping the newest element is the synchronous behavior for this queue (there are other queue implementations available, but those will communicate dropping asynchronously which can be really bad for memory consumption in a burst). If you'd prefer something else, it may make sense to have a very small buffer in the queue and attach the "overall" Source
(the one returned by Source.fromMaterializer
) to a stage which signals perpetual demand. For example, a buffer(downstreamBufferSize, OverflowStrategy.dropHead)
will drop the oldest event not yet processed. Alternatively, it may be possible to combine your Event
s in some meaningful way, in which case a conflate
stage will automatically combine incoming Event
s if the downstream can't process them quickly.
CodePudding user response:
Great answer! I did build something similar. There are also kamon metrics to monitor queue size exc.
class AsyncSubscriber(projectId: String, subscriptionId: String, metricsRegistry: CustomMetricsRegistry, pullParallelism: Int)(implicit val ec: Executor) {
private val logger = LoggerFactory.getLogger(getClass)
def bufferSize: Int = 1000
def source(): Source[(PubsubMessage, AckReplyConsumer), Future[NotUsed]] = {
Source.fromMaterializer { (mat, attr) =>
val (queue, source) = Source.queue[(PubsubMessage, AckReplyConsumer)](bufferSize).preMaterialize()(mat)
val receiver: MessageReceiver = {
(message: PubsubMessage, consumer: AckReplyConsumer) => {
metricsRegistry.inputEventQueueSize.update(queue.size())
queue.offer((message, consumer)) match {
case QueueOfferResult.Enqueued =>
metricsRegistry.inputQueueAddEventCounter.increment()
case QueueOfferResult.Dropped =>
metricsRegistry.inputQueueDropEventCounter.increment()
consumer.nack()
logger.warn(s"Buffer is full, message nacked. Pubsub should retry don't panic. If this happens too often, we should also tweak the buffer size or the autoscaler.")
case QueueOfferResult.Failure(ex) =>
metricsRegistry.inputQueueDropEventCounter.increment()
consumer.nack()
logger.error(s"Failed to offer message with id=${message.getMessageId()}", ex)
case QueueOfferResult.QueueClosed =>
logger.error("Destination Queue closed. Something went terribly wrong. Shutting down the jvm.")
consumer.nack()
mat.shutdown()
sys.exit(1)
}
}
}
val subscriptionName = ProjectSubscriptionName.of(projectId, subscriptionId)
val subscriber = Subscriber.newBuilder(subscriptionName, receiver).setParallelPullCount(pullParallelism).build
subscriber.startAsync().awaitRunning()
source.withAttributes(attr)
}
}
}