1

我有一个 Actor 系统,它正在处理来自外部系统的连续流消息。我的系统中有以下演员。

  1. SubscribeActor- 这个 Actor 订阅了 Redis 通道并创建了一个新的 InferActor 并将 JSON 有效负载传递给它。
  2. InferenceActor- 这个演员负责2a。解析负载并从 JSON 负载中提取一些值文本值。2b。调用外部REST service将 2a 中提取的值传递给此服务。REST 服务部署在 LAN 中的不同节点上,并且在计算方面做了一些相当繁重的工作。

2b 中的外部 REST 服务是使用 Spray 客户端调用的。我测试了系统,它工作正常,直到 2a。但是,一旦我介绍了 2b。我开始出现 OutOfMemory 错误,系统最终停止运行。

目前,我有两个主要嫌疑人——

  1. 设计缺陷 - 我在 Actor 系统中使用 Spray 客户端的方式不正确(我是 Spray 新手)
  2. 由于缓慢的 REST 服务导致的延迟导致的性能问题。

在我转到 #2 之前,我想确保我正确使用了 Spray 客户端,尤其是。当我从其他演员那里打电话时。我的问题是正确/不正确/次优下面的用法?

这是调用该服务的 Web 服务 REST 客户端的代码。

trait GeoWebClient {
  def get(url: String, params: Map[String, String]): Future[String]
}

class GeoSprayWebClient(implicit system: ActorSystem) extends GeoWebClient {

  import system.dispatcher

  // create a function from HttpRequest to a Future of HttpResponse
  val pipeline: HttpRequest => Future[HttpResponse] = sendReceive

  // create a function to send a GET request and receive a string response
  def get(path: String, params: Map[String, String]): Future[String] = {

    val uri = Uri("http://myhost:9191/infer") withQuery params
    val request = Get(uri)
    val futureResponse = pipeline(request)
    futureResponse.map(_.entity.asString)
  }
}

这是InferenceActor调用上述服务的代码。

class InferenceActor extends Actor with ActorLogging with ParseUtils {

  val system = context.system    
  import system.dispatcher    
  val restServiceClient = new GeoSprayWebClient()(system)    

  def receive = {

    case JsonMsg(s) => {

      //first parse the message to 
      val text: Option[String] = parseAndExtractText(s) //defined in ParseUtils trait
      log.info(s"extract text $text")

      def sendReq(text: String) = {
        import spray.http._
        val params = Map(("text" -> text))
        // send GET request with absolute URI
        val futureResponse = restServiceClient.get("http://myhost:9191/infer", params)
        futureResponse
      }

      val f: Option[Future[String]] = text.map(x => sendReq(x))

      // wait for Future to complete NOTE: I commented this code without any change. 
      /* f.foreach { r => r.onComplete {
        case Success(response) => log.debug("*********************" + response)
        case Failure(error) => log.info("An error has occurred: " + error.getMessage)
      }
      }
      */
      context stop self    

    }
  }        
}
4

1 回答 1

0

如果您的第二段代码像您说的那样阻塞,则尝试按照 akka 文档中的说明将那个未来包装在另一个未来中,那么Blocking 需要仔细管理

它应该限制该请求的资源量。

虽然看起来将 text.map 转移到不同的演员会更容易。

于 2014-10-29T23:59:39.410 回答