The fs2 stream does not interrupt here:
import cats.effect.*
import fs2.Stream
import scala.concurrent.duration.*
import cats.effect.unsafe.implicits.global
val test = for {
cancel <- Deferred[IO, Either[Throwable, Unit]]
_ <- (IO.unit.delayBy(5.seconds).map { _ => println("Completing deferred"); cancel.complete(Right(())) }).start
_ <- Stream.awakeEvery[IO](1.second).map(x => println(x)).interruptWhen(cancel).compile.drain
} yield ()
test.unsafeRunSync()
but it interrupts if we swap the lines and fibers:
import cats.effect.*
import fs2.Stream
import scala.concurrent.duration.*
import cats.effect.unsafe.implicits.global
val test = for {
cancel <- Deferred[IO, Either[Throwable, Unit]]
_ <- Stream.awakeEvery[IO](1.second).map(x => println(x)).interruptWhen(cancel).compile.drain.start
_ <- (IO.unit.delayBy(5.seconds).map { _ => println("Completing deferred"); cancel.complete(Right(())) })
} yield ()
test.unsafeRunSync()
I wonder why...
CodePudding user response:
The issue is that you are not using IO
properly.
Remember an IO[A]
is just a program description, a value. It does nothing on its own.
When you call cancel.complete
you are just creating a new program, it is not doing anything unless you compose it with other programs. And you are composing it in a map
method; which doesn't really combine the programs, so your cancel
is lost, and the start
will just create a fiber that will create such a program and discard it.
In the second example, since for
translates everything to a flatMap
you ended up composing the program by accident.
The quick solution is to use flatMap
rather than map
in the first example.
But, IMHO, a better solution is using proper combinators like this:
val run: IO[Unit] =
Deferred[IO, Either[Throwable, Unit]].flatMap { cancelToken =>
val cancel =
IO.sleep(5.seconds) >>
IO.println("Completing deferred") >>
cancelToken.complete(Right(()))
val program =
IO.println("Starting stream") >>
Stream
.awakeEvery[IO](1.second)
.foreach(x => IO.println(x))
.interruptWhen(cancelToken)
.compile
.drain >>
IO.println("Stream finished")
cancel.background.surround(program)
}
You can see the code running here.