Home > Software engineering >  fs2 stream does not interrupt on Deferred
fs2 stream does not interrupt on Deferred

Time:12-29

The fs2 stream does not interrupt here:

import cats.effect.*
import fs2.Stream
import scala.concurrent.duration.*
import cats.effect.unsafe.implicits.global

val test = for {
      cancel <- Deferred[IO, Either[Throwable, Unit]]
      _ <- (IO.unit.delayBy(5.seconds).map { _ => println("Completing deferred"); cancel.complete(Right(())) }).start
      _ <- Stream.awakeEvery[IO](1.second).map(x => println(x)).interruptWhen(cancel).compile.drain
    } yield ()

test.unsafeRunSync()

but it interrupts if we swap the lines and fibers:

import cats.effect.*
import fs2.Stream
import scala.concurrent.duration.*
import cats.effect.unsafe.implicits.global

val test = for {
      cancel <- Deferred[IO, Either[Throwable, Unit]]
      _ <- Stream.awakeEvery[IO](1.second).map(x => println(x)).interruptWhen(cancel).compile.drain.start
      _ <- (IO.unit.delayBy(5.seconds).map { _ => println("Completing deferred"); cancel.complete(Right(())) })
    } yield ()

test.unsafeRunSync()

I wonder why...

CodePudding user response:

The issue is that you are not using IO properly.
Remember an IO[A] is just a program description, a value. It does nothing on its own.

When you call cancel.complete you are just creating a new program, it is not doing anything unless you compose it with other programs. And you are composing it in a map method; which doesn't really combine the programs, so your cancel is lost, and the start will just create a fiber that will create such a program and discard it.

In the second example, since for translates everything to a flatMap you ended up composing the program by accident.

The quick solution is to use flatMap rather than map in the first example. But, IMHO, a better solution is using proper combinators like this:

val run: IO[Unit] =
  Deferred[IO, Either[Throwable, Unit]].flatMap { cancelToken =>
    val cancel =
      IO.sleep(5.seconds) >>
      IO.println("Completing deferred") >>
      cancelToken.complete(Right(()))

    val program =
      IO.println("Starting stream") >>
      Stream
        .awakeEvery[IO](1.second)
        .foreach(x => IO.println(x))
        .interruptWhen(cancelToken)
        .compile
        .drain >>
      IO.println("Stream finished")

  cancel.background.surround(program)
}

You can see the code running here.

  • Related