Home > OS >  Use explicit execution context with OptionT[Future, _]
Use explicit execution context with OptionT[Future, _]

Time:06-15

I am trying to write a custom Akka SnapshotStore plugin.

I am at the point where I want to implement this method:

def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): Future[Option[SelectedSnapshot]]

This is what I have so far:

import cats.data.OptionT
import cats.implicits._
...  

override def loadAsync(
    persistenceId: String,
    criteria: SnapshotSelectionCriteria
  ): Future[Option[SelectedSnapshot]] = {
    // same as in the original plugin
    val metadata = snapshotMetadatas(persistenceId, criteria).sorted.takeRight(maxLoadAttempts)
    // need to get rid of this one! 
    import scala.concurrent.ExecutionContext.Implicits.global

    val getSnapshotAndReportMetric = for {
      snapshot <- OptionT.fromOption[Future](getMaybeSnapshotFromMetadata(metadata))
      _ <- OptionT.liftF(Future {
        observabilityService
          .recordMetric(
            LongCounterMetric(readVehicleSnapshotFromDiskCounter, SnapShotDirectoryScannerCommandOptions)
          )
      }(directorySnapshotScanningDispatcher))
    } yield snapshot

    getSnapshotAndReportMetric.value.recoverWith {
      // retry if we listed an older snapshot that was deleted before loading
      case _: NoSuchFileException => loadAsync(persistenceId, criteria)
    }(streamDispatcher)
  }

For reference here is the signature of getMaybeSnapshotFromMetadata. It's signature can be modified to add a Future, but ultimately the wrapped response must be of type Option[SelectedSnapshot].

private def getMaybeSnapshotFromMetadata(metadata: Seq[SnapshotMetadata]): Option[SelectedSnapshot]

The code as is compiles, but only because I have imported the implicit global ExecutionContext. My goal is to use different explicit execution contexts (different configurable dispatchers), but I don't see how to do it for the first line (the one that calls getMaybeSnapshotFromMetadata) within for-comprehensions. If I used OptionT.liftF then I could do it, e.g.

...
snapshot <- OptionT.liftF( Future { getMaybeSnapshotFromMetadata(metadata)}(streamDispatcher))

...but then I get a OptionT[Future, Option[SelectedSnapshot] as a result.

Is there a solution to what I want to achieve? If not I can work around with mere Futures and it's andThen chain method:

    Future {
      getMaybeSnapshotFromMetadata(metadata)
    }(streamDispatcher)
      .andThen(selectedSnapshot => {
        observabilityService
          .recordMetric(
            LongCounterMetric(readVehicleSnapshotFromDiskCounter, SnapShotDirectoryScannerCommandOptions)
          )
        selectedSnapshot
      })(opentelemetryDispatcher)
      .recoverWith {
        // retry if we listed an older snapshot that was deleted before loading
        case _: NoSuchFileException => loadAsync(persistenceId, criteria)
      }(streamDispatcher)

Update For the second line within for-comprehension liftF is actually a viable solution - I updated the code block.

CodePudding user response:

Do not overuse cats. It's a nice tool for some things, but when used improperly, it just adds complexity and hurts readability without any benefit.

    Future(getMaybeSnapshotFromMetadata(metadata))(streamDispatcher)
      .andThen { case _ => 
        observabilityService.recordWiseMetric(...)
      }(directorySnapshotScanningDispatcher) 
      .recoverWith(...)(streamDispatcher)

This is equivalent to your code without all the complication ... It is not a "workaround", but an actual proper way to write this.

  • Related