Home > Blockchain >  Making Calculation on Future requests based on priority
Making Calculation on Future requests based on priority

Time:02-17

I have a question regarding handling making async operations and taking action based on priority.

Consider the following code:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// mock database call
def isSiteExcludedAtList(listId: Int, siteId: Int): Future[Some[Int]] = Future {
  Some(1)
}

// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] = {

  val domainFutures: Future[List[Option[Int]]] = Future.traverse(listIds)(listId => isSiteExcludedAtList(listId, domainHash))
  val subDomainFutures: Future[List[Option[Int]]] = Future.traverse(listIds)(listId => isSiteExcludedAtList(listId, subdomainHash))

  // Is there other way?
  for {
    res <- Future.sequence(
      List(
        subDomainFutures.map(res => "subdomain" -> res),
        domainFutures.map(res => "domain" -> res)
      )
    )
  } yield {
    val subdomainExclusion: List[Int] = res.filter(_._1 == "subdomain").flatMap(_._2).flatten
    val domainExclusion: List[Int] = res.filter(_._1 == "domain").flatMap(_._2).flatten
    if (subdomainExclusion.nonEmpty) {
      s"its subdomain exclusion with results: ${subdomainExclusion}"
    }
    else {
      s"its domain exclusion with results: ${domainExclusion}"
    }
  }
}

What i want to achieve:

  • isSiteExcludedAtList returns Int object from database, this is mocked in my example but its actually and async call to get some int value from some key which contains both listId and siteId from database.
  • I want to create subdomainFutures and domainFutures and start to run them together
  • I want to check if there result from subdomainFutures, if so - its subdomain exclusion and i want to return this
  • if all subdomainFutures not return any result - i want to check domainFutures and return result base on this.

Note: waiting for only one result of subdomain is optional optimization.

Is there a more pretty way to achieve this? Thanks!

CodePudding user response:

If I understood the problem correctly, you want to stop processing at the first result to return a Some

If you are open to using cats-effect, that is pretty easy to achieve like this:

import cats.effect.IO
import cats.syntax.all._

def isSiteExcludedAtList(listId: Int, siteId: Int): IO[Option[Int]] =
  IO.println(s"Computing for ${listId} - ${siteId}").as(Some(10))

def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): IO[Unit] = {
  val processSubdomains =
    listIds.collectFirstSomeM(listId => isSiteExcludedAtList(listId, siteId = subdomainHash))

  val processDomains =
    listIds.collectFirstSomeM(listId => isSiteExcludedAtList(listId, siteId = domainHash))

  processSubdomains.flatMap {
    case Some(subdomainExclusion) =>
      IO.println(s"Its subdomain exclusion with result: ${subdomainExclusion}")

    case None =>
      processDomains.flatMap {
        case Some(domainExclusion) =>
          IO.println(s"Its domain exclusion with result: ${domainExclusion}")

        case None =>
          IO.println("All subdomains and domains are included!")
      }
  }
}

You can see the code running here

Note: Another approach would be to tag each computation with is origin (domain, or subdomain) and combine all them in a big list and perform a single collectFirstSomeM both are equivalent.


I suppose you can do something similar with Futures only cats

import cats.syntax.all._
import scala.concurrent.ExecutionContext.Implicits.global

def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[Unit] = {
  val processSubdomains =
    listIds.collectFirstSomeM(listId => isSiteExcludedAtList(listId, siteId = subdomainHash))

  processSubdomains.flatMap {
    case Some(subdomainExclusion) =>
      Future.succesful(println(s"Its subdomain exclusion with result: ${subdomainExclusion}"))

    case None =>
      val processDomains =
        listIds.collectFirstSomeM(listId => isSiteExcludedAtList(listId, siteId = domainHash))

      processDomains.map {
        case Some(domainExclusion) =>
          println(s"Its domain exclusion with result: ${domainExclusion}")

        case None =>
          println("All subdomains and domains are included!")
      }
  }
}

But I am not sure if this guarantees that only one Future is expected at the time and that it won't execute more Futures than necessary.

Without cats you would need to use a custom foldLeft to get the same result.

CodePudding user response:

Something like this maybe?

    subdomainFutures.map(_.flatten).flatMap { 
       case sds if (sds.nonEmpty) => Future.successful(sds -> Nil)
       case _ => domainFutures.map(_.flatten).map(Nil -> _)
    }.map {
      case (sds, _) if (sds.nonEmpty) => s"subdomain exclusion $sds"
      case (_, ds) if (ds.nonEmpty) => s"domain exclusion $ds"
      case _ => "no exclusion"
    }

Or, maybe, pull domain queries up to the same level too:

    subdomainFutures.zip(domainFutures)
      .map { case (s,d) = (s.flatten, d.flatten) }
      .map {
        case (sds, _) if (sds.nonEmpty) => s"subdomain exclusion $sds"
        case (_, ds) if (ds.nonEmpty) => s"domain exclusion $ds"
        case _ => "no exclusion"
      }

I think, it's more or less the same thing you are doing, just expressed in a little bit more straightforward way IMO.

One downside is it will wait for all subdomain queries to come back even if the very first one returns a result (the second variant looks a little "slicker", but it also waits for all domain queries unconditionally, which is an additional inefficiency).

There are ways to optimize that (nothing is impossible!) but I can't think of any that wouldn't look excessively complicated for the use case to me.

CodePudding user response:

I'd like to describe how to improve a bit your code while still using futures, but I'm a bit confused of what this code is doing. What is this number that isSiteExcludedAtList returns? Is it an identifier and you want to collect identifiers for all list ids, and you're only concerned with that you don't want to query using domainHash if it's enough to use subdomainHash? That's what your code seems to be doing but then, if I understand correctly the answer above, the one with cats-effect and collectFirstSomeM, then that code looks only for the first result that is Some(number) and then stops. For example, if the first ever call to isSiteExcludedAtList will return Some(1) then we won't call anything more.

So, I have three answers for you.

  1. This is if you want to collect a list of ints and you only want to avoid calling isSiteExcludedAtList with domainHash if calls subdomainHash give you some results already. In this case you can chain both Future.traverse and call the second one only if the first one returns no results.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// mock database call
def isSiteExcludedAtList(listId: Int, siteId: Int): Future[Some[Int]] = 
  Future { Some(1) }

// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] =
  for {
    res1   <- Future.traverse(listIds)(isSiteExcludedAtList(_, subdomainHash))
    subIds =  res1.flatten
    res2   <- if (subIds.isEmpty) 
                Future.traverse(listIds)(isSiteExcludedAtList(_, domainHash)) 
              else 
                Future.successful(Nil)
    domIds =  res2.flatten
  } yield 
    if (subIds.nonEmpty)
      s"its subdomain exclusion with results: ${subIds}"
    else if (domIds.nonEmpty)
      s"its domain exclusion with results: ${domIds}"
    else
      "no exclusion"
  1. This is if you look for the first result that indicates that the listId is excluded and then you want to query no more. In that case, all calls to isSiteExcludedAtList must be chained, i.e. you call a next one only when you get no result from the previous one. It can be done with recursion:
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// mock database call
def isSiteExcludedAtList(listId: Int, siteId: Int): Future[Option[Int]] = 
  Future { Some(1) }

def isSiteExcludedAtList(listIds: List[Int], hash: Int): Future[Option[Int]] = 
  listIds match {
    case Nil => 
      Future.successful(None)
    case head :: tail => 
      isSiteExcludedAtList(head, hash).flatMap {
        case Some(id) => Future.successful(Some(id))
        case None     => isSiteExcludedAtList(tail, hash)
      }
  }

// if you use Scala 3, change this to an enum
sealed trait Exclusion
final case class SubdomainExclusion(id: Int) extends Exclusion
final case class DomainExclusion(id: Int) extends Exclusion
case object NoExclusion extends Exclusion

// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] =
  isSiteExcludedAtList(listIds, subdomainHash).flatMap {
    case Some(id) => 
      Future.successful(SubdomainExclusion(id))
    case None     => 
      isSiteExcludedAtList(listIds, domainHash).map {
        case Some(id) => DomainExclusion(id)
        case None     => NoExclusion
      }
  }.map {
    case SubdomainExclusion(id) => s"subdomain exclusion $id"
    case DomainExclusion(id)    => s"domain exclusion: $id"
    case NoExclusion            => "no exclusion"
  }
  1. And the third possibility is that instead of using Future.traverse and asking for each listId separately, you will implement a query that will return all excluded ids for a given hash - subdomainHash or domainHash, and then you will just check if a common set of your listIds and ids returned by that query is non-empty. The code will be similar to that from my first answer, but it will make only two calls to the database. I'm writing about it because from my experience it's a common pattern in dealing with databases: we have some already written queries and as our code becomes more complex we start to use those queries in loops, which leads to sub-optimal performance, while instead we could write a bit more complex query which we would call only once.
import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global

// mock database call
def isSiteExcludedAtListBulk(siteId: Int): Future[Set[Int]] = 
  Future { Set(10, 20, 30) }

// main logic
def isExcluded(listIds: List[Int], subdomainHash: Int, domainHash: Int): Future[String] =
  for {
    excludedSubIds <- isSiteExcludedAtListBulk(subdomainHash)
    subIds         =  listIds.filter(excludedSubIds)
    excludedDomIds <- if (subIds.isEmpty) 
                        isSiteExcludedAtListBulk(domainHash)
                      else 
                        Future.successful(Set.empty)
    domIds         =  listIds.filter(excludedDomIds)
  } yield 
    if (subIds.nonEmpty)
      s"its subdomain exclusion with results: ${subIds}"
    else if (domIds.nonEmpty)
      s"its domain exclusion with results: ${domIds}"
    else
      "no exclusion"
  • Related