我是 Spray 的新手,并试图实现一个从服务器接收数据流的客户端。我当前的代码如下所示。客户端向 HTTP 服务器发送一个请求,然后该服务器返回一个数据流(作为分块响应)。我检查了它是否连接到服务器并可以得到响应。
但是,我不清楚应该如何处理断开连接和重新连接。例如,(A)如果我失去了网络连接或(B)我的客户端超时,因为当时服务器可能没有任何数据要发送。任何指针/示例将不胜感激。
更新
首先,我想检测上面的事件(A)和(B)。当客户端遇到上述 (A) 或 (B) 时,它应该重新建立连接并重新进行身份验证,以便它可以继续(返回connected
状态以获取数据流。
import spray.http._
import spray.client.pipelining._
import akka.actor._
import spray.can.Http
import akka.io.IO
import spray.http.HttpRequest
import spray.http.ChunkedResponseStart
import spray.http.HttpRequest
import spray.http.ChunkedResponseStart
trait Authorization {
def authorize: HttpRequest => HttpRequest
}
trait OAuthAuthorization extends Authorization {
import OAuth._
val consumer = ???
val token = ???
val authorize: (HttpRequest) => HttpRequest = OAuthorizer(token, token)
}
class StreamerActor(uri: Uri) extends Actor with ActorLogging {
this: Authorization =>
val io = IO(Http)(context.system)
//Initial state of the Actor
def receive = ready
def ready: Receive = {
case query: String =>
val body = HttpEntity(ContentType(MediaTypes.`application/x-www-form-urlencoded`), s"$query")
val req = HttpRequest(HttpMethods.POST, uri = uri, entity = body) ~> authorize
sendTo(io).withResponsesReceivedBy(self)(req)
//As soon as you get the data you should change state to "connected" by using a "become"
context become connected
}
def connected: Receive = {
case ChunkedResponseStart(_) => log.info("Chunked Response started.")
case MessageChunk(entity, _) => log.info(entity.asString)
case ChunkedMessageEnd(_, _) => log.info("Chunked Message Ended")
case Http.Closed => log.info("HTTP closed")
case _ =>
}
}
object SprayStreamer extends App {
val system = ActorSystem("simple-spray-http")
val Uri = Uri("https://.....")
val streamClient = system.actorOf(Props(new StreamerActor(Uri) with OAuthAuthorization), name = "spray-client")
streamClient ! "keyword"
}
这些是我的内容resources/application.conf
spray {
can.server {
idle-timeout = 90 s
request-timeout = 80 s
connection-timeout = 90 s
reqiest-chunk-aggregation-limit = 0
}
can.client {
idle-timeout = 90 s
request-timeout = 80 s
connection-timeout = 90 s
response-chunk-aggregation-limit = 0
}
io.confirm-sends = on
}