我知道使用 ActorMaterialzer 上的监督策略可以重新启动 akka-stream
val decider: Supervision.Decider = {
case _: ArithmeticException => Supervision.Resume
case _ => Supervision.Stop
}
implicit val materializer = ActorMaterializer(
ActorMaterializerSettings(system).withSupervisionStrategy(decider))
val source = Source(0 to 5).map(100 / _)
val result = source.runWith(Sink.fold(0)(_ + _))
// the element causing division by zero will be dropped
// result here will be a Future completed with Success(228)
来源:http ://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-error.html
我有以下用例。
/***
scalaVersion := "2.11.8"
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-http-experimental" % "2.4.2",
"com.typesafe.akka" %% "akka-http-spray-json-experimental" % "2.4.2"
)
*/
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json._
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import Uri.Query
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.util.{Success, Failure}
import scala.concurrent.Await
import scala.concurrent.duration.Duration
import scala.concurrent.Future
object SO extends DefaultJsonProtocol {
implicit val system = ActorSystem()
import system.dispatcher
implicit val materializer = ActorMaterializer()
val httpFlow = Http().cachedHostConnectionPoolHttps[HttpRequest]("example.org")
def search(query: Char) = {
val request = HttpRequest(uri = Uri("https://example.org").withQuery(Query("q" -> query.toString)))
(request, request)
}
case class Hello(name: String)
implicit val helloFormat = jsonFormat1(Hello)
val searches =
Source('a' to 'z').map(search).via(httpFlow).mapAsync(1){
case (Success(response), _) => Unmarshal(response).to[Hello]
case (Failure(e), _) => Future.failed(e)
}
def main(): Unit = {
Await.result(searches.runForeach(_ => println), Duration.Inf)
()
}
}
有时查询将无法解组。我想在
https://example.org/?q=v
不重新启动整个字母表的情况下对该单个查询使用重试策略。