I'm trying to implement a query that returns the extracted information inside a fs2.Stream
. I defined the Jobs
algebra:
trait Jobs[F[_]] {
def all(): fs2.Stream[F, Job]
}
Then, I implemented an interpreter for the algebra:
final class LiveJobs[F[_]: MonadCancelThrow](postgres: Resource[F, Transactor[F]]) extends Jobs[F] {
override def all(): fs2.Stream[F, Job] = for {
jobs <- postgres.use { xa =>
sql"SELECT * FROM jobs".query[Job].stream.transact(xa)
}
} yield jobs
}
However, the compiler yells because the types are not aligned:
type mismatch;
[error] found : fs2.Stream[[_]F[_],Job]
[error] required: F[?]
[error] sql"SELECT * FROM jobs".query[Job].stream.transact(xa)
[error] ^
[error] one error found
The Resource.use
method needs a function that produces an F[*]
, not an fs2.Stream[F, Job]
. I cannot find anything that lets me convert between the two types or a different way to use the postgres
resource.
CodePudding user response:
The following is probably the design you want to follow:
trait Jobs[F[_]] {
def all: fs2.Stream[F, Job] =
}
object Jobs {
// I am not exactly sure which typeclass you require here, so i will use Async
def live[F[_]](implicit ev: Async[F]): Resource[F, Jobs[F]] = {
val transactor: Resource[F, Transactor[F]] = ... // Whatever you already have here.
transactor.map(xa => new LiveJobs(xa))
}
}
private[pckg] final class LiveJobs[F[_]](xa: Transactor[F])(implicit ev: MonadCancelThrow[F]) extends Jobs[F] {
override final val all: fs2.Stream[F, Job] =
sql"SELECT * FROM jobs".query[Job].stream.transact(xa)
}
Also, my personal advice, stick to concrete IO
while learning; and maybe even after.
The whole F[_]
thing will just cause more trouble than worth originally.