Home > Software engineering >  Returning a Stream of Entities using a Transactor inside a Resource in Doobie
Returning a Stream of Entities using a Transactor inside a Resource in Doobie

Time:08-24

I'm trying to implement a query that returns the extracted information inside a fs2.Stream. I defined the Jobs algebra:

trait Jobs[F[_]] {
  def all(): fs2.Stream[F, Job]
}

Then, I implemented an interpreter for the algebra:

final class LiveJobs[F[_]: MonadCancelThrow](postgres: Resource[F, Transactor[F]]) extends Jobs[F] {
  override def all(): fs2.Stream[F, Job] = for {
    jobs <- postgres.use { xa =>
      sql"SELECT * FROM jobs".query[Job].stream.transact(xa)
    }
  } yield jobs
}

However, the compiler yells because the types are not aligned:

type mismatch;
[error]  found   : fs2.Stream[[_]F[_],Job]
[error]  required: F[?]
[error]       sql"SELECT * FROM jobs".query[Job].stream.transact(xa)
[error]                                                         ^
[error] one error found

The Resource.use method needs a function that produces an F[*], not an fs2.Stream[F, Job]. I cannot find anything that lets me convert between the two types or a different way to use the postgres resource.

CodePudding user response:

The following is probably the design you want to follow:

trait Jobs[F[_]] {
  def all: fs2.Stream[F, Job] =
}

object Jobs {
  // I am not exactly sure which typeclass you require here, so i will use Async
  def live[F[_]](implicit ev: Async[F]): Resource[F, Jobs[F]] = {
    val transactor: Resource[F, Transactor[F]] = ... // Whatever you already have here.
    transactor.map(xa => new LiveJobs(xa))
  }
}

private[pckg] final class LiveJobs[F[_]](xa: Transactor[F])(implicit ev: MonadCancelThrow[F]) extends Jobs[F] {
  override final val all: fs2.Stream[F, Job] =
    sql"SELECT * FROM jobs".query[Job].stream.transact(xa)
}

Also, my personal advice, stick to concrete IO while learning; and maybe even after. The whole F[_] thing will just cause more trouble than worth originally.

  • Related