Home > Software design >  For Comprehension of Futures - Kicking off new thread inside comprehension and disregarding result
For Comprehension of Futures - Kicking off new thread inside comprehension and disregarding result

Time:07-26

I'm trying to use a for comprehension to both run some futures in order and merged results, but also kick off a separate thread after one of those futures completes and not care about the result (basically used to fire some logging info)

I've played around a bit with this with some thread sleeps and it looks like whatever i'm throwing inside the for block will end up blocking the thread.

private def testFunction(): EitherT[Future, Error, Response] =
    for {
      firstRes <- EitherT(client.getFirst())
      secondRes <- EitherT(client.getSecond())
      // Future i want to run on a separate async thread outside the comprehension
      _ = runSomeLogging(secondRes)
      thirdRes <- EitherT(client.getThird(secondRes.value))
    } yield thirdRes

def runSomeLogging(): Future[Either[Error, Response]] =
        Thread.sleep(10000)
        Future.successful(Right(Response("123")))
    }

So this above code will wait the 10 seconds before returning the thirdRes result. My hope was to kick off the runSomeLogging function on a separate thread after the secondRes runs. I thought the usage of the = rather than the <- would cause that, however it doesn't.

The way I am able to get this to work is below. Basically I run my second future outside of the comprehension and use the .onComplete on the previous future to only run my logging if certain conditions were meant from the above comprehension. I only want to run this logging function if the secondRes function is successful in my example here.

private def runSomeLogging(response: SecondRes) =
    Thread.sleep(10000)
    response.value.onComplete {
      case Success(either) =>
        either.fold(
          _ => {  },
          response => { logThing() }
        )
      case _ =>
    }

private def testFunction(): EitherT[Future, Error, Response] =
        val res = for {
          firstRes <- EitherT(client.getFirst())
          secondRes <- EitherT(client.getSecond())
          thirdRes <- EitherT(client.getThird(secondRes.value))
        } yield thirdRes
runSomeLogging(res)
return res

This second example works fine and does what I want, it doesn't block the for comprehension for 10 seconds from returning. However, because there are dependencies of this running for certain pieces of the comprehension, but not all of them, I was hoping there was a way to kick off the job from within the for comprehension itself but let it run on its own thread and not block the comprehension from completing.

CodePudding user response:

You need a function that starts the Future but doesn't return it, so the for-comprehension can move on (since Future's map/flatMap functions won't continue to the next step until the current Future resolves). To accomplish a "start and forget", you need to use a function that returns immediately, or a Future that resolves immediately.

// this function will return immediately
def runSomeLogging(res: SomeResult): Unit = {
  // since startLoggingFuture uses Future.apply, calling it will start the Future,
  // but we ignore the actual Future by returning Unit instead
  startLoggingFuture(res)
}

// this function returns a future that takes 10 seconds to resolve
private def startLoggingFuture(res: SomeResult): Future[Unit] = Future {
  // note: please don't actually do Thread.sleep in your Future's thread pool
  Thread.sleep(10000)
  logger.info(s"Got result $res")
}

Then you could put e.g.

_ = runSomeLogging(res)

or

_ <- Future { runSomeLogging(res) }

in your for-comprehension.

Note, Cats-Effect and Monix have a nice abstraction for "start but ignore result", with io.start.void and task.startAndForget respectively. If you were using IO or Task instead of Future, you could use .start.void or .startAndForget on the logging task.

  • Related