1

我有以下流:

Source(IndexedSeq(ByteString.empty))
.via(
  Tcp().outgoingConnection(bsAddress, bsPort)
    .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
    .map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
  Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
  )
).onComplete {
    case Success(Done) => printAndByeBye("Stream ends successfully")
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
  }

它可以正常工作一段时间,我可以使用填充在 Kafka 主题上的消息。但有时,显然是在一个随机的时间间隔,没有更多的消息发布,并且这段代码没有记录任何错误(printAndByeBye 将打印传递的消息并终止actor系统。)重新启动应用程序后,消息继续流动。

关于如何知道这里发生了什么的任何想法?

编辑:我把 Kamon 放在上面,我可以看到以下行为:

每个 Actor 的邮箱大小

每个 Actor 在邮箱中的时间

每个 Actor 的处理时间

看起来有些东西在没有通知流应该停止的情况下停止了,但我不知道如何使其明确并停止流。

4

3 回答 3

0

我建议使用监督属性创建流来处理 TCP 连接中可能出现的异常,如下所示:

val flow = 
    Tcp().outgoingConnection("", 12)
          .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
          .map(_.utf8String).withAttributes(ActorAttributes.supervisionStrategy {
      case ex: Throwable =>
        println("Error ocurred: " + ex)
        Supervision.Resume
     }

Source(IndexedSeq(ByteString.empty))
.via(flow)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
  Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
  )
).onComplete {
    case Success(Done) => printAndByeBye("Stream ends successfully")
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
  }

如果流程有任何错误,则流将停止。使用此配置,您将看到流程是否引发了任何异常。

于 2016-12-14T15:45:24.393 回答
0

如果一切都安静下来,可能是因为在某处施加了背压。尝试并有选择地用非背压感知阶段替换您的背压感知阶段,并检查问题是否仍然存在。在您的情况下,有两种可能的背压来源:

1)TCP连接

您可以尝试将无限源附加ByteString到 Kafka,执行以下操作:

Source.cycle(() => List(???).iterator)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runWith(
  Producer.plainSink(
    ProducerSettings(system, new ByteArraySerializer, new StringSerializer)
      .withBootstrapServers(s"${kafkaAddress}:${kafkaPort}")
  )
).onComplete {
    case Success(Done) => printAndByeBye("Stream ends successfully")
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
  }

2) 卡夫卡水槽

用一些日志记录替换它

Source(IndexedSeq(ByteString.empty))
.via(
  Tcp().outgoingConnection(bsAddress, bsPort)
    .via(Framing.delimiter(ByteString("\n"), 256, allowTruncation = true))
    .map(_.utf8String)
)
.map(m => new ProducerRecord[Array[Byte], String](kafkaTopic, m))
.runForeach(println)
.onComplete {
    case Success(Done) => printAndByeBye("Stream ends successfully")
    case Failure(ex) => printAndByeBye("Stream ends with an error: " + ex.toString)
  }

您能仅在 2 种情况中的一种中看到问题吗?同时?没有?

于 2016-12-15T16:52:30.707 回答
0

流没有失败,但由于发布数据的设备在一段时间后停止发送数据而没有断开连接,因此 TCP 流变得空闲。而不是使用更简单的:

TCP().outgoingConnection(bsAddress, bsPort)

我最终使用:

def outgoingConnection(
remoteAddress:  InetSocketAddress,
localAddress:   Option[InetSocketAddress]           = None,
options:        immutable.Traversable[SocketOption] = Nil,
halfClose:      Boolean                             = true,
connectTimeout: Duration                            = Duration.Inf,
idleTimeout:    Duration                            = Duration.Inf): Flow[ByteString, ByteString, Future[OutgoingConnection]] = ???

所以

Tcp().outgoingConnection(bsAddress, bsPort)

变成了

val connectTimeout: Duration = 1 second
val idleTimeout: Duration = 2 second
Tcp().outgoingConnection(
    remoteAddress = InetSocketAddress.createUnresolved(bsAddress, bsPort),
    connectTimeout = connectTimeout,
    idleTimeout = idleTimeout
  )

通过通知 idleTimeout,后续启动失败并且可以重新启动另一个流程。

于 2016-12-27T20:20:49.893 回答