Home > Blockchain >  Slick Futures converted into Promises
Slick Futures converted into Promises

Time:01-21

I am starting to develop in Scala, so I started witha really simple RESTful API using AKKA HTTP actors and then wanted to add a PostgreSQL database to "close up" the project. The thing is that somewhere in the project, a Future that is returned by a db.run method is converted into a Promise and returning me errors. When I run the Main object and start the API and hit somewhere, I get this error: Cannot cast scala.concurrent.impl.Promise$DefaultPromise to scala.collection.immutable.Seq or Cannot cast scala.concurrent.impl.Promise$DefaultPromise to api.Item depending on which route I an hitting.

Here is the main api.scala file:

package api

import akka.actor.{Actor, ActorSystem, Props}
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import akka.http.scaladsl.model.StatusCodes
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route
import akka.pattern.ask
import akka.util.Timeout
import api.Main.ItemActor._
import slick.jdbc.JdbcBackend.Database
import spray.json.DefaultJsonProtocol._

import scala.concurrent.Future
import scala.concurrent.duration.DurationInt
import scala.util.{Failure, Success}


object Main extends App {

  val db = Database.forConfig("scaladb");
  val itemDao = new handler(db)
  val system = ActorSystem("mySystem")
  val itemActor = system.actorOf(Props(new ItemActor(db)))

  implicit val actorSystem = system
  implicit val itemFormat = jsonFormat3(Item)
  implicit val timeout: Timeout = Timeout(5.seconds)

  class ItemActor(db: Database) extends Actor {
    import api.Main.ItemActor._

    def receive = {
      case CreateItem(item) =>
        sender() ! itemDao.create(item)
      case ReadItem(id) =>
        sender() ! itemDao.read(id)
      case ReadAllItems =>
        sender() ! itemDao.readAll
      case UpdateItem(item) =>
        sender() ! itemDao.update(item)
      case DeleteItem(id) =>
        sender() ! itemDao.delete(id)
    }

  }

  object ItemActor {
    case class CreateItem(item: Item)

    case class ReadItem(id: Int)

    case object ReadAllItems

    case class UpdateItem(item: Item)

    case class DeleteItem(id: Int)
  }

  def handleResponse(futureResponse: Future[Item]): Route = {
    onComplete(futureResponse) {
      case Success(response) => complete(response)
      case Failure(ex) => complete(StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")
    }
  }

  def handleResponseSeq(futureResponse: Future[Seq[Item]]): Route = {
    onComplete(futureResponse) {
      case Success(response) => complete(response)
      case Failure(ex) => complete(StatusCodes.InternalServerError, s"An error occurred: ${ex.getMessage}")
    }
  }

  val routes = pathPrefix("items") {
    pathEnd {
      post {
        entity(as[Item]) { item =>
          handleResponse((itemActor ? CreateItem(item)).mapTo[Item])
        }
      } ~
      get {
        handleResponseSeq((itemActor ? ReadAllItems).mapTo[Seq[Item]])
      }
    } ~
    path(IntNumber) { id =>
      get {
        handleResponse((itemActor ? ReadItem(id)).mapTo[Item])
      } ~
      put {
        entity(as[Item]) { item =>
          handleResponse((itemActor ? UpdateItem(item)).mapTo[Item])
        }
      } ~
      delete {
        handleResponse((itemActor ? DeleteItem(id)).mapTo[Item])
      }
    }
  }

  val bindRoutes = Http().bindAndHandle(routes, "localhost", 8888)
  println("Server online at http://localhost:8888/")
}

Then the handler (Where I definde the methods that access the PostgreSQL database):

package api

import slick.jdbc.PostgresProfile.api._
import scala.concurrent.Future

class handler (db:Database){
  val items = TableQuery[Items]

  def create(item:Item): Future[Item] = {
    db.run((items returning items.map(_.id.?) into ((item, id) => item.copy(id = id)))  = item)
  }

  def read(id: Int): Future[Option[Item]] = {
    db.run(items.filter(_.id === id).result.headOption)
  }

  def readAll: Future[Seq[Item]] = {
    println((db.run(items.result)).getClass)
    db.run(items.result)
  }

  def update(item: Item): Future[Int] = {
    db.run(items.filter(_.id === item.id).update(item))
  }

  def delete(id: Int): Future[Int] = {
    db.run(items.filter(_.id === id).delete)
  }
}

And the items file:

package api
import slick.jdbc.PostgresProfile.api._

case class Item(id: Option[Int] = None, name: String, description: String)

class Items(tag: Tag) extends Table[Item](tag, "items") {
  def id = column[Int]("id", O.PrimaryKey, O.AutoInc)
  def name = column[String]("name")
  def description = column[String]("description")
  def * = (id.?, name, description) <> (Item.tupled, Item.unapply)
}

I've tried to use a getClass next to the db.run(items.result) in the handler file, and it prits class scala.concurrent.impl.Promise$DefaultPromise so it must be something of an implicit converter. Thanks.

CodePudding user response:

Here: handleResponse((itemActor ? CreateItem(item)).mapTo[Item]) Actor returns Future[Item], mapTo[Item] tries to cast it to item and fails.

You want your actor to return the actual item, not Future result from db.run. I haven't used akka in a while, but I think, something like this should work:

val replyTo = sender
...
case CreateItem(item) => itemDao.create(item).onComplete { 
   case Success(i) => replyTo ! i 
   case Failure(e) => throw e
}
...

CodePudding user response:

You're mixing Futures and actors, which is generally not a great idea.

In your ItemActor, instead of sending the future as a reply, it's a better idea to pipe the future as a reply (the reply won't actually happen until the future is complete, that is to say, the DAO has a result).

import akka.pattern.pipe

class ItemActor(db: Database) extends Actor {
  import ItemActor._
  import context.dispatcher

  def receive = {
    case CreateItem(item) =>
      itemDao.create(item).pipeTo(sender())

    case ReadItem(id) =>
      itemDao.read(id).pipeTo(sender())
  }
}

That said, at least in this code, there doesn't really seem to be a good reason for ItemActor to exist, given that it's just forwarding operations to the DAO. Making the itemDao visible in the routes, you could just as well do:

handleResponse(itemDao.create(item))
  • Related