1

我需要编写一个不断轮询 Web 服务器以获取命令的客户端。来自服务器的响应表明命令可用(在这种情况下响应包含命令)或没有命令可用的指令,您应该对传入命令发出新请求。

我试图弄清楚如何使用 spray-client 和 Akka 来完成它,我可以想办法做到这一点,但它们看起来都不是完成它的惯用方式。所以问题是:

让几个线程轮询同一个 Web 服务器以获取传入命令并将命令交给参与者的最明智的方法是什么?

4

2 回答 2

1

此示例使用spray-clientscala futuresAkka scheduler

实现取决于所需的行为(同时并行执行许多请求,以不同的时间间隔执行,向一个参与者发送响应以一次处理一个响应,向多个参与者发送响应以并行处理......等等)。

这个特定的例子展示了如何同时并行执行多个请求,然后在每个结果完成时对其进行处理,而无需等待同时触发的任何其他请求完成。

下面的代码将每 5 秒对 0.0.0.0:9000/helloWorld 和 0.0.0.0:9000/goodbyeWorld 并行执行两个 HTTP 请求。

在 Scala 2.10、Spray 1.1-M7 和 Akka 2.1.2 中测试:

处理周期性作业执行的实际调度代码:

// Schedule a periodic task to occur every 5 seconds, starting as soon 
// as this schedule is registered
system.scheduler.schedule(initialDelay = 0 seconds, interval = 5 seconds) {

  val paths = Seq("helloWorld", "goodbyeWorld")

  // perform an HTTP request to 0.0.0.0:9000/helloWorld and 
  // 0.0.0.0:9000/goodbyeWorld
  // in parallel (possibly, depending on available cpu and cores)
  val retrievedData = Future.traverse(paths) { path =>
    val response = fetch(path)
    printResponse(response)
    response
  }
}

辅助方法/样板设置:

// Helper method to fetch the body of an HTTP endpoint as a string
def fetch(path: String): Future[String] = {
  pipeline(HttpRequest(method = GET, uri = s"/$path"))

}

// Helper method for printing a future'd string asynchronously
def printResponse(response: Future[String]) {
  // Alternatively, do response.onComplete {...}
  for (res <- response) {
    println(res)
  }
}


// Spray client boilerplate
val ioBridge = IOExtension(system).ioBridge()
val httpClient = system.actorOf(Props(new HttpClient(ioBridge)))

// Register a "gateway" to a particular host for HTTP requests 
// (0.0.0.0:9000 in this case)
val conduit = system.actorOf(
  props = Props(new HttpConduit(httpClient, "0.0.0.0", 9000)),
  name = "http-conduit"
)

// Create a simple pipeline to deserialize the request body into a string
val pipeline: HttpRequest => Future[String] = {
  sendReceive(conduit) ~> unmarshal[String]
}

一些注意事项:

  • Future.traverse用于并行运行期货(忽略顺序)。在future 列表上使用for comprehension 将一次执行一个future,等待每个future 完成。

    // Executes `oneThing`, executes `andThenAnother` when `oneThing` is complete,
    // then executes `finally` when `andThenAnother` completes.
    for {
      oneThing <- future1
      andThenAnother <- future2
      finally <- future3
    } yield (...)
    
  • system将需要替换为您实际的 Akka 演员系统。

  • system.scheduler.schedule在这种情况下,每 5 秒执行一次任意代码块——还有一个重载版本用于调度要发送到 actorRef 的消息。

    system.scheduler.schedule(
      initialDelay = 0 seconds,
      frequency    = 30 minutes,
      receiver     = rssPoller, // an actorRef
      message      = "doit" // the message to send to the actorRef
    )
    
  • 对于您的特定情况, printResponse 可以替换为 actor send: anActorRef ! response

  • 代码示例没有考虑故障——处理故障的好地方是在 printResponse(或等效)方法中,通过使用 Future onComplete 回调:response.onComplete {...}
  • 也许很明显,但是 spray-client 可以替换为另一个 http 客户端,只需替换fetch方法和随附的 spray 代码即可。

更新:完整的运行代码示例在这里

git clone repo,签出指定的提交 sha,$ sbt run导航到0.0.0.0:9000,并在控制台中观察执行的代码sbt run——它应该打印Hello World!\n'Goodbye World!OR Goodbye World!\nHelloWorld!(由于并行 Future.traverse 执行,顺序可能是随机的)。

于 2013-03-17T08:50:50.497 回答
0

您可以使用HTML5 服务器发送事件。它在许多 Scala 框架中实现。例如在xitrum代码看起来像:

class SSE extends Controller {
  def sse = GET("/sse") {
    addConnectionClosedListener {
      // The connection has been closed
      // Unsubscribe from events, release resources etc.
    }

    future {
        respondEventSource("command1")
        //...
        respondEventSource("command2")
        //...
    }
 }

SSE 非常简单,不仅可以在浏览器中使用,而且可以在任何软件中使用。Akka 集成在 xitrum 中,我们在类似的系统中使用它。但它使用 netty 作为异步服务器,它也适合在 10-15 个线程中处理数千个请求。

因此,通过这种方式,您的客户端将保持与服务器的连接,并在连接中断时重新连接。

于 2013-03-16T13:58:28.950 回答