假设我有以下简单的图表。
class KafkaSource[A](kI: KafkaIterator) extends GraphStage[SourceShape[A]] {
val out = Outlet[A]("KafkaSource.out")
override val shape = SourceShape.of(out)
override def createLogic(attr: Attributes): GraphStageLogic =
new GraphStageLogic(shape) {
setHandler(out, new OutHandler {
override def onPull(): Unit = {
push(out, kI.next)
}
})
}
}
val g = GraphDSL.create(){ implicit b =>
val source = b.add(new KafkaSource[Message](itr))
val sink = b.add(Sink.foreach[Message](println))
source ~> sink
ClosedShape
}
我们正在运行它
RunnableGraph.fromGraph(g).run()
我希望通知 kafkaSource 停止(或人为完成)而不是推动下一个可用元素,以便下游连接的阶段也停止。
我该如何做到这一点?
场景是,我们在 kafka 中有数百万条消息,我们希望每天晚上 9 点(例如)停止处理消息,并假设我们正在通过干净关闭来停止正在运行的应用程序。