3

开启:akka-stream-experimental_2.11 1.0。

我们在 Tcp 服务器中使用 Framing.delimiter。当消息到达的长度大于 maximumFrameLength 时,会抛出 FramingException,我们可以从 ActorSubscriber 的 OnError 中捕获它。

服务器代码:

def bind(address: String, port: Int, target: ActorRef, maxInFlight: Int, maxFrameLength: Int)
    (implicit system: ActorSystem, actorMaterializer: ActorMaterializer): Future[ServerBinding] = {
    val sink = Sink.foreach {
      conn: Tcp.IncomingConnection =>
        val targetSubscriber = ActorSubscriber[Message](system.actorOf(Props(new TargetSubscriber(target, maxInFlight))))

        val targetSink = Flow[ByteString]
          .via(Framing.delimiter(ByteString("\n"), maximumFrameLength = maxFrameLength, allowTruncation = true))
          .map(raw ⇒ Message(raw))
          .to(Sink(targetSubscriber))

        conn.flow.to(targetSink).runWith(Source(Promise().future))
    }
    val connections = Tcp().bind(address, port)
    connections.to(sink).run()
  }

订阅者代码:

class TargetSubscriber(target: ActorRef, maxInFlight: Int) extends ActorSubscriber with ActorLogging {
  private var inFlight = 0

  override protected def requestStrategy = new MaxInFlightRequestStrategy(maxInFlight) {
    override def inFlightInternally = inFlight
  }

  override def receive = {
    case OnNext(msg: Message) ⇒
      target ! msg
      inFlight += 1
    case OnError(t) ⇒
      inFlight -= 1
      log.error(t, "Subscriber encountered error")
    case TargetAck(_) ⇒
      inFlight -= 1
  }
}

问题:在该传入连接的此异常之后,低于最大帧长度的消息不会流动。杀死客户端并重新运行它工作正常。

ActorSubscriber 不尊重监督

跳过坏消息并继续下一条好消息的正确方法是什么?

4

2 回答 2

0

您是否尝试过对targetFlow水槽而不是整个物化器进行监督?我在这里的任何地方都看不到它,我相信它应该直接设置在该流程上。

尽管如此,这更像是一种猜测而不是科学;)

于 2015-08-21T22:09:24.477 回答
-1

我从文件中读取了同样的异常,对我来说,它是通过在最后一行后面加上一个 return 来解决的。

于 2021-07-01T18:17:55.540 回答