I am trying to write a custom Akka SnapshotStore plugin.
I am at the point where I want to implement this method:
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]
This is what I have so far:
import cats.data.OptionT
import cats.implicits._
...
override def loadAsync(
persistenceId: String,
criteria: SnapshotSelectionCriteria
): Future[Option[SelectedSnapshot]] = {
// same as in the original plugin
val metadata = snapshotMetadatas(persistenceId, criteria).sorted.takeRight(maxLoadAttempts)
// need to get rid of this one!
import scala.concurrent.ExecutionContext.Implicits.global
val getSnapshotAndReportMetric = for {
snapshot <- OptionT.fromOption[Future](getMaybeSnapshotFromMetadata(metadata))
_ <- OptionT.liftF(Future {
observabilityService
.recordMetric(
LongCounterMetric(readVehicleSnapshotFromDiskCounter, SnapShotDirectoryScannerCommandOptions)
)
}(directorySnapshotScanningDispatcher))
} yield snapshot
getSnapshotAndReportMetric.value.recoverWith {
// retry if we listed an older snapshot that was deleted before loading
case _: NoSuchFileException => loadAsync(persistenceId, criteria)
}(streamDispatcher)
}
For reference here is the signature of getMaybeSnapshotFromMetadata
. It's signature can be modified to add a Future
, but ultimately the wrapped response must be of type Option[SelectedSnapshot]
.
private def getMaybeSnapshotFromMetadata(metadata: Seq[SnapshotMetadata]): Option[SelectedSnapshot]
The code as is compiles, but only because I have imported the implicit global ExecutionContext
. My goal is to use different explicit execution contexts (different configurable dispatchers), but I don't see how to do it for the first line (the one that calls getMaybeSnapshotFromMetadata
) within for-comprehensions. If I used OptionT.liftF
then I could do it, e.g.
...
snapshot <- OptionT.liftF( Future { getMaybeSnapshotFromMetadata(metadata)}(streamDispatcher))
...but then I get a OptionT[Future, Option[SelectedSnapshot]
as a result.
Is there a solution to what I want to achieve? If not I can work around with mere Future
s and it's andThen
chain method:
Future {
getMaybeSnapshotFromMetadata(metadata)
}(streamDispatcher)
.andThen(selectedSnapshot => {
observabilityService
.recordMetric(
LongCounterMetric(readVehicleSnapshotFromDiskCounter, SnapShotDirectoryScannerCommandOptions)
)
selectedSnapshot
})(opentelemetryDispatcher)
.recoverWith {
// retry if we listed an older snapshot that was deleted before loading
case _: NoSuchFileException => loadAsync(persistenceId, criteria)
}(streamDispatcher)
Update For the second line within for-comprehension liftF
is actually a viable solution - I updated the code block.
CodePudding user response:
Do not overuse cats. It's a nice tool for some things, but when used improperly, it just adds complexity and hurts readability without any benefit.
Future(getMaybeSnapshotFromMetadata(metadata))(streamDispatcher)
.andThen { case _ =>
observabilityService.recordWiseMetric(...)
}(directorySnapshotScanningDispatcher)
.recoverWith(...)(streamDispatcher)
This is equivalent to your code without all the complication ... It is not a "workaround", but an actual proper way to write this.