0

假设我有以下简单的图表。

class KafkaSource[A](kI: KafkaIterator) extends GraphStage[SourceShape[A]] {

  val out = Outlet[A]("KafkaSource.out")

  override val shape = SourceShape.of(out)

  override def createLogic(attr: Attributes): GraphStageLogic =
    new GraphStageLogic(shape) {
      setHandler(out, new OutHandler {
        override def onPull(): Unit = {
          push(out, kI.next)
        }
      })
    }
}

val g = GraphDSL.create(){ implicit b =>
  val source = b.add(new KafkaSource[Message](itr))
  val sink = b.add(Sink.foreach[Message](println))

  source ~> sink
  ClosedShape
}

我们正在运行它

RunnableGraph.fromGraph(g).run()

我希望通知 kafkaSource 停止(或人为完成)而不是推动下一个可用元素,以便下游连接的阶段也停止。

我该如何做到这一点?

场景是,我们在 kafka 中有数百万条消息,我们希望每天晚上 9 点(例如)停止处理消息,并假设我们正在通过干净关闭来停止正在运行的应用程序。

4

1 回答 1

0

虽然可能不再与 phantomastray 相关,但对其他人有帮助:

KillSwitch 允许从外部完成 FlowShape 图形。它由一个流元素组成,该元素可以链接到需要完成控制的 FlowShape 图形。KillSwitch 特征允许完成或失败图表。[来源:Akka 文档]

于 2017-01-20T12:51:57.227 回答