我正在为 Finagle 客户端重试而苦苦挣扎。由于某种原因,客户端不会在测试中重试失败的请求,即使我使用自定义分类器,它应该将除 200 之外的任何响应代码标记为 RetryableFailure。我已经尝试使用 ClientBuilder 和 Http.client 来构建客户端。
我缩小了我的代码并只测试了必要的部分。
请看一下:
MyApi.scala:
import io.circe._, io.circe.generic.auto._, io.circe.jawn._, io.circe.syntax._
import com.twitter.finagle.http._
import com.twitter.finagle.Service
import com.twitter.finagle.Codec
import com.twitter.finagle.builder._
import scala.concurrent.ExecutionContext.Implicits.global
import com.twitter.conversions.time._
import com.twitter.finagle.http.service.HttpResponseClassifier
import com.twitter.finagle.service._
import com.twitter.util.Return
class MyApi(val host: String, val protocol: String = "https") {
// def request: Service[Request, Response] = {
// val clientBuilder = ClientBuilder()
// .codec(Http())
// .responseClassifier(classifier)
// .noFailureAccrual
// .requestTimeout(30.seconds)
// .hostConnectionLimit(5)
// .tcpConnectTimeout(5.seconds)
// .retryBudget(budget)
// .retries(5)
// protocol match {
// case "https" => clientBuilder.hosts(s"$host:443").tls(host).build()
// case _ => clientBuilder.hosts(host).build()
// }
// }
val budget: RetryBudget = RetryBudget(
ttl = 10.seconds,
minRetriesPerSec = 5,
percentCanRetry = 0.1
)
val classifier: ResponseClassifier = {
case ReqRep(_, Return(r: Response)) if r.statusCode == 200 =>
ResponseClass.Success
case _ => ResponseClass.RetryableFailure
}
def request: Service[Request, Response] = com.twitter.finagle.Http.client
.withResponseClassifier(classifier)
.withRetryBudget(budget)
.withRetryBackoff(Backoff.exponentialJittered(2.seconds, 32.seconds))
.newService(host)
def requestUsers = request(users(1))
val users: (Int => Request) = (n) => RequestBuilder()
.url(s"$protocol://$host/api?results=$n")
.buildGet()
}
object MyApi {
def apply(host: String, protocol: String) = new MyApi(host, protocol)
}
MyApiSpec.scala:
import com.twitter.util.Return
import com.twitter.finagle.http.service.HttpResponseClassifier
import org.scalatest._
import org.scalatest.concurrent._
import org.scalamock.proxy.ProxyMockFactory
import scala.language.postfixOps
import com.twitter.util.{Await}
import org.scalamock.scalatest.MockFactory
import com.twitter.finagle.http._
import org.scalamock.proxy.ProxyMockFactory
import io.finch._
import com.twitter.finagle.Http
import scala.io.Source
import com.twitter.io.{Reader, Buf}
import com.twitter.finagle.ListeningServer
class MyApiSpec extends FlatSpec with Matchers with MockFactory with ScalaFutures with ProxyMockFactory with BeforeAndAfter with BeforeAndAfterAll {
var server: ListeningServer = _
val ru = MyApi("localhost:1490", "http")
after {
server.close()
}
var attempts = 0
val failure: Endpoint[String] = get("api") {
println(attempts)
if (attempts > 1) {
Ok("test message")
}
else {
attempts += 1
BadGateway(new Exception("try again"))
}
}
it should "avoid network issues by retrying" in {
server = Http.server
.serve(":1490", failure.toServiceAs[Text.Plain])
val users = Await.result(ru.requestUsers).contentString
assert(users == "test message")
}
}
构建.sbt:
name := "myapi"
version := "1.0"
scalaVersion := "2.11.7"
libraryDependencies += "org.scalactic" %% "scalactic" % "2.2.6"
libraryDependencies += "org.scalatest" %% "scalatest" % "2.2.6" % "test"
libraryDependencies += "org.scala-lang.modules" %% "scala-parser-combinators" % "1.0.2"
val circeVersion = "0.4.1"
libraryDependencies ++= Seq(
"io.circe" %% "circe-core",
"io.circe" %% "circe-generic",
"io.circe" %% "circe-parser"
).map(_ % circeVersion)
libraryDependencies ++= Seq(
"com.github.finagle" %% "finch-core" % "0.11.0-M2",
"com.github.finagle" %% "finch-circe" % "0.11.0-M2"
)
libraryDependencies += "com.typesafe" % "config" % "1.3.0"
libraryDependencies += "com.typesafe.slick" %% "slick" % "3.1.1"
libraryDependencies += "com.typesafe.slick" %% "slick-hikaricp" % "3.1.0"
libraryDependencies += "com.typesafe.slick" %% "slick-testkit" % "3.1.1" % "test"
libraryDependencies += "org.postgresql" % "postgresql" % "9.4-1206-jdbc4"
libraryDependencies += "junit" % "junit" % "4.8.1" % "test"
libraryDependencies += "org.scalamock" %% "scalamock-scalatest-support" % "3.2.2" % "test"
val scalazVersion = "7.2.4"
libraryDependencies ++= Seq(
"org.scalaz" %% "scalaz-core" % scalazVersion,
"org.scalaz" %% "scalaz-effect" % scalazVersion,
"org.scalaz" %% "scalaz-scalacheck-binding" % scalazVersion % "test"
)
scalacOptions += "-feature"
initialCommands in console := "import scalaz._, Scalaz._"