基本上我试图使用akka http从数据库中获取数据。如果我直接在 api 中通过 (EmployeeRepo.findAll()) 那么它会显示所有数据,但是在使用 actor 时它会显示转换错误....这里仅用于数据获取的问题请解决
这是我的员工回购-----------------------------------
package org.repo
import org.data.Employee
import org.db.DbConfig
import org.mongodb.scala.MongoCollection
import org.utils.JsonUtils
import org.mongodb.scala.bson.conversions.Bson
import org.mongodb.scala.model.Filters.equal
import org.mongodb.scala.model.FindOneAndUpdateOptions
import org.mongodb.scala.model.Updates.{combine, set}
import scala.concurrent.Future
object EmployeeRepo extends JsonUtils{
private val employeeDoc:MongoCollection[Employee]=DbConfig.employees
def createCollection()={
DbConfig.database.createCollection("employee").subscribe(
(result)=>println(s"$result"),
e=>println(e.getLocalizedMessage),
()=>println("complete")
)
}
def insertData(emp:Employee)={
employeeDoc.insertOne(emp).toFuture()
}
def findAll()= employeeDoc.find().toFuture()
def update(emp:Employee,id:String)=
employeeDoc
.findOneAndUpdate(equal("_id", id),
setBsonValue(emp),
FindOneAndUpdateOptions().upsert(true)).toFuture()
def delete(id:String)=
employeeDoc.deleteOne(equal("_id",id)).toFuture()
private def setBsonValue(emp:Employee)={
combine(
set("name",emp.name),
set("dateOfBirth",emp.dateOfBirth)
)
}
}
============EMployeeService------------ 包org.service
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import java.util.UUID
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.Source
import org.data.Employee
import org.domain.EmployeeRequest
import org.repo.EmployeeRepo
import scala.concurrent.Future
class EmployeeService {
implicit val system=ActorSystem("Employeee")
implicit val materializer=ActorMaterializer
import system.dispatcher
def saveEmployeeData= (employeeRequest:EmployeeRequest) => {
val employeeDoc:Employee=employeeMapperWithNewId(employeeRequest)
EmployeeRepo.insertData(employeeDoc)
}
def findAll={
EmployeeRepo.findAll()
}
def update(employeeRequest: EmployeeRequest,id:String)={
val employeeDoc=employeeMapperWithNewId(employeeRequest)
EmployeeRepo.update(emp = employeeDoc,id)
}
def delete(id:String)=EmployeeRepo.delete(id)
private def employeeMapperWithNewId(employee:EmployeeRequest)={
Employee(name=employee.name,dateOfBirth = LocalDate.parse(employee.dateOfBirth, DateTimeFormatter.ISO_DATE),
_id=UUID.randomUUID.toString)
}
}
------雇员演员-----
package org.actor
import akka.actor.{Actor, ActorLogging}
import org.actor.EmployeeActor.{Delete, Save, SearchAll, Update}
import org.data.Employee
import org.domain.EmployeeRequest
import org.service.EmployeeService
object EmployeeActor {
case class Save(emp: EmployeeRequest)
case object SearchAll
case class Update(emp: EmployeeRequest, id: String)
case class Delete(id: String)
}
class
EmployeeActor extends Actor with ActorLogging {
private val employeeService: EmployeeService = new EmployeeService()
override def receive: Receive = {
case Save(emp) =>
log.info(s"recevied msg saved with employee :$emp")
sender() ! employeeService.saveEmployeeData(emp)
case SearchAll =>
log.info("received msg find ALL")
sender() ! employeeService.findAll
case Update(emp, id) =>
log.info(s"received update msg for id $id and employee $emp")
sender() ! employeeService.update(emp, id)
case Delete(id) =>
log.info(s"received msg for deleting employee of id $id")
sender() ! employeeService.delete(id)
case _ =>
log.debug("Unhandled msg")
}
}
---------------员工路线----
package org
import akka.actor.{ActorSystem, Props}
import akka.http.scaladsl.model.{ContentTypes, HttpEntity, StatusCodes}
import akka.stream.ActorMaterializer
import org.actor.EmployeeActor
import org.utils.{JsonUtils, TimeUtils}
import akka.http.scaladsl.server.Directives._
import akka.{NotUsed, util}
import akka.util.{ByteString, Timeout}
import org.data.Employee
import org.domain.EmployeeRequest
import akka.pattern.{Patterns, ask}
import akka.stream.scaladsl.Source
import org.actor.EmployeeActor.{Delete, Save, SearchAll, Update}
import org.service.EmployeeService
import scala.concurrent.duration._
import spray.json._
import scala.concurrent.{Await, Future}
class EmployeeRoute extends JsonUtils{
implicit val system=ActorSystem("Employee")
implicit val materializer=ActorMaterializer
import system.dispatcher
val actor=system.actorOf(Props[EmployeeActor],"employeeActor")
val employeeService=new EmployeeService()
implicit val timeOut=Timeout(5.seconds)
val getRoute={
pathPrefix("employee"){
(pathEndOrSingleSlash & get){
complete((actor ? SearchAll).mapTo[Seq[EmployeeRequest]])
}~
( path("update") & put & parameter("id".as[String])){id=>
entity(as[EmployeeRequest]){employee=>
complete((actor ? Update(employee,id)).map(_=>StatusCodes.OK))
}
}~
post{
entity(as[EmployeeRequest]) { employee =>
complete((actor ? Save(employee)).map(_ => StatusCodes.OK))
}
}~
delete{
(path(Segment) |parameter("id".as[String])){id=>
complete((actor ? Delete(id)).map(_=>StatusCodes.OK))
}
}
}
}
}
=================错误===============
[ERROR] [09/09/2020 19:46:48.551] [web-app-akka.actor.default-dispatcher-4] [akka.actor.ActorSystemImpl(web-app)] Error during processing of request: 'Cannot cast scala.concurrent.impl.Promise$Transformation to scala.collection.immutable.Seq'. Completing with 500 Internal Server Error response. To change default exception handling behavior, provide a custom ExceptionHandler.
java.lang.ClassCastException: Cannot cast scala.concurrent.impl.Promise$Transformation to scala.collection.immutable.Seq
at java.base/java.lang.Class.cast(Class.java:3734)
at scala.concurrent.Future.$anonfun$mapTo$1(Future.scala:464)
at scala.concurrent.impl.Promise$Transformation.run(Promise.scala:430)
at scala.concurrent.ExecutionContext$parasitic$.execute(ExecutionContext.scala:164)
at scala.concurrent.impl.Promise$Transformation.submitWithValue(Promise.scala:392)
at scala.concurrent.impl.Promise$DefaultPromise.submitWithValue(Promise.scala:299)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete0(Promise.scala:249)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:242)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:615)
at org.actor.EmployeeActor$$anonfun$receive$1.applyOrElse(EmployeeActor.scala:32)
at akka.actor.Actor.aroundReceive(Actor.scala:537)
at akka.actor.Actor.aroundReceive$(Actor.scala:535)
at org.actor.EmployeeActor.aroundReceive(EmployeeActor.scala:22)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:577)
at akka.actor.ActorCell.invoke(ActorCell.scala:547)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
at akka.dispatch.Mailbox.run(Mailbox.scala:231)
at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1016)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1665)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1598)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
请帮帮我..