5

我有一个负责处理 http 调用的 Akka 演员。我使用 scala dispatch 通过 API 发送多个 HTTP 请求:

urls.foreach { u
  val service = url(u)
  val promise = Http(service OK as.String).either
  for(p <- promise)
  {
     p match
     {
       case Left(error) =>
         faultHandler(error)
       case Right(result) =>
         resultHandler(result)
     }
  }

resultHandler函数中,我增加一个实例变量nbOfResults并与我完成的调用次数进行比较。

def resultHandler(result:String)
{
  this.nbOfResults++
  ...
  if(nbOfResults == nbOfCalls)
    // Do something
}

安全吗?nbOfResults如果两个调用同时返回结果,是否可以同时访问该变量?

目前,我认为actor或多或少相当于一个线程,因此回调函数不会同时执行。这是正确的吗 ?

4

4 回答 4

3

这是仅使用 dispatch 的 Alexey Romanov 响应的变体:

//Promises will be of type Array[Promise[Either[Throwable, String]]]
val promises = urls.map { u =>
    val service = url(u)

    Http(service OK as.String).either
}

//Http.promise.all transform an Iterable[Promise[A]] into Promise[Iterable[A]]
//So listPromise is now of type Promise[Array[Either[Throwable, String]]]
val listPromise = Http.promise.all(promises)

for (results <- listPromise) {
    //Here results is of type Array[Either[Throwable, String]]

    results foreach { result =>
        result match {
            Left(error) => //Handle error
            Right(response) => //Handle response
        }
    }
}
于 2012-12-03T19:21:16.290 回答
2

有一个更好的方法:

val promises = urls.map {u =>
  val service = url(u)
  val promise = Http(service OK as.String).either
}

val listPromise = Future.sequence(promises)

listPromise.onComplete { whatever }
于 2012-12-03T17:35:07.603 回答
2

我同意 Alexey Romanov 的回答。无论您选择以何种方式同步您的 http 请求,请注意您处理承诺完成的方式。您的直觉是正确的,因为并发访问可能会出现在参与者的状态上。处理此问题的更好方法是执行以下操作:

def resultHandler(result: String) {
    //on completion we are sending the result to the actor who triggered the call
    //as a message
    self ! HttpComplete(result)
}

在演员的接收功能中:

def receive = {
    //PROCESS OTHER MESSAGES HERE
    case HttpComplete(result) => //do something with the result
}

这样,您可以确保处理 http 结果不会从外部违反参与者的状态,而是从参与者的接收循环,这是正确的方法

于 2012-12-03T17:44:06.050 回答
1
val nbOfResults = new java.util.concurrent.atomic.AtomicInteger(nbOfCalls)

// After particular call was ended    
if (nbOfResults.decrementAndGet <= 0) {
  // Do something
}

[编辑] 使用 AtomicReference CAS 删除旧答案 - while(true)、compareAndSet 等

于 2012-12-03T17:57:06.570 回答