我有一个 Actor 系统,它正在处理来自外部系统的连续流消息。我的系统中有以下演员。
SubscribeActor
- 这个 Actor 订阅了 Redis 通道并创建了一个新的 InferActor 并将 JSON 有效负载传递给它。InferenceActor
- 这个演员负责2a。解析负载并从 JSON 负载中提取一些值文本值。2b。调用外部REST service
将 2a 中提取的值传递给此服务。REST 服务部署在 LAN 中的不同节点上,并且在计算方面做了一些相当繁重的工作。
2b 中的外部 REST 服务是使用 Spray 客户端调用的。我测试了系统,它工作正常,直到 2a。但是,一旦我介绍了 2b。我开始出现 OutOfMemory 错误,系统最终停止运行。
目前,我有两个主要嫌疑人——
- 设计缺陷 - 我在 Actor 系统中使用 Spray 客户端的方式不正确(我是 Spray 新手)
- 由于缓慢的 REST 服务导致的延迟导致的性能问题。
在我转到 #2 之前,我想确保我正确使用了 Spray 客户端,尤其是。当我从其他演员那里打电话时。我的问题是正确/不正确/次优下面的用法?
这是调用该服务的 Web 服务 REST 客户端的代码。
trait GeoWebClient {
def get(url: String, params: Map[String, String]): Future[String]
}
class GeoSprayWebClient(implicit system: ActorSystem) extends GeoWebClient {
import system.dispatcher
// create a function from HttpRequest to a Future of HttpResponse
val pipeline: HttpRequest => Future[HttpResponse] = sendReceive
// create a function to send a GET request and receive a string response
def get(path: String, params: Map[String, String]): Future[String] = {
val uri = Uri("http://myhost:9191/infer") withQuery params
val request = Get(uri)
val futureResponse = pipeline(request)
futureResponse.map(_.entity.asString)
}
}
这是InferenceActor
调用上述服务的代码。
class InferenceActor extends Actor with ActorLogging with ParseUtils {
val system = context.system
import system.dispatcher
val restServiceClient = new GeoSprayWebClient()(system)
def receive = {
case JsonMsg(s) => {
//first parse the message to
val text: Option[String] = parseAndExtractText(s) //defined in ParseUtils trait
log.info(s"extract text $text")
def sendReq(text: String) = {
import spray.http._
val params = Map(("text" -> text))
// send GET request with absolute URI
val futureResponse = restServiceClient.get("http://myhost:9191/infer", params)
futureResponse
}
val f: Option[Future[String]] = text.map(x => sendReq(x))
// wait for Future to complete NOTE: I commented this code without any change.
/* f.foreach { r => r.onComplete {
case Success(response) => log.debug("*********************" + response)
case Failure(error) => log.info("An error has occurred: " + error.getMessage)
}
}
*/
context stop self
}
}
}