I'm trying to use a for comprehension to both run some futures in order and merged results, but also kick off a separate thread after one of those futures completes and not care about the result (basically used to fire some logging info)
I've played around a bit with this with some thread sleeps and it looks like whatever i'm throwing inside the for block will end up blocking the thread.
private def testFunction(): EitherT[Future, Error, Response] =
for {
firstRes <- EitherT(client.getFirst())
secondRes <- EitherT(client.getSecond())
// Future i want to run on a separate async thread outside the comprehension
_ = runSomeLogging(secondRes)
thirdRes <- EitherT(client.getThird(secondRes.value))
} yield thirdRes
def runSomeLogging(): Future[Either[Error, Response]] =
Thread.sleep(10000)
Future.successful(Right(Response("123")))
}
So this above code will wait the 10 seconds before returning the thirdRes result. My hope was to kick off the runSomeLogging function on a separate thread after the secondRes runs. I thought the usage of the = rather than the <- would cause that, however it doesn't.
The way I am able to get this to work is below. Basically I run my second future outside of the comprehension and use the .onComplete on the previous future to only run my logging if certain conditions were meant from the above comprehension. I only want to run this logging function if the secondRes function is successful in my example here.
private def runSomeLogging(response: SecondRes) =
Thread.sleep(10000)
response.value.onComplete {
case Success(either) =>
either.fold(
_ => { },
response => { logThing() }
)
case _ =>
}
private def testFunction(): EitherT[Future, Error, Response] =
val res = for {
firstRes <- EitherT(client.getFirst())
secondRes <- EitherT(client.getSecond())
thirdRes <- EitherT(client.getThird(secondRes.value))
} yield thirdRes
runSomeLogging(res)
return res
This second example works fine and does what I want, it doesn't block the for comprehension for 10 seconds from returning. However, because there are dependencies of this running for certain pieces of the comprehension, but not all of them, I was hoping there was a way to kick off the job from within the for comprehension itself but let it run on its own thread and not block the comprehension from completing.
CodePudding user response:
You need a function that starts the Future but doesn't return it, so the for-comprehension can move on (since Future's map/flatMap
functions won't continue to the next step until the current Future resolves). To accomplish a "start and forget", you need to use a function that returns immediately, or a Future that resolves immediately.
// this function will return immediately
def runSomeLogging(res: SomeResult): Unit = {
// since startLoggingFuture uses Future.apply, calling it will start the Future,
// but we ignore the actual Future by returning Unit instead
startLoggingFuture(res)
}
// this function returns a future that takes 10 seconds to resolve
private def startLoggingFuture(res: SomeResult): Future[Unit] = Future {
// note: please don't actually do Thread.sleep in your Future's thread pool
Thread.sleep(10000)
logger.info(s"Got result $res")
}
Then you could put e.g.
_ = runSomeLogging(res)
or
_ <- Future { runSomeLogging(res) }
in your for-comprehension.
Note, Cats-Effect and Monix have a nice abstraction for "start but ignore result", with io.start.void
and task.startAndForget
respectively. If you were using IO
or Task
instead of Future
, you could use .start.void
or .startAndForget
on the logging task.