0

我正在运行从 Kafka 读取的 akka 流,并且我想在文件序列化成功时将消息提交回 Kafka。但我不知道如何通知上游阶段下游失败。

现在,我用FanOutShape2[ConsumerMessage.CommittableMessage[Array[Byte], Array[Byte]], ConsumerRecord[Array[Byte], Array[Byte]], ConsumerMessage.CommittableOffsetBatch].

当下游接收器完成消耗所有每小时分组时,我想推送到偏移提交出口ConsumerRecords,但是当文件无法正确完成时,我想保留这些提交。

因此,鉴于这个简化的场景,假设我有以下流

  Source(List("one", "two"))
    .map(ByteString(_))
    .runWith(FileIO.toPath(Paths.get("/file-in-root-will-fail.txt")))

这将以IOResult(0,Failure(java.nio.file.AccessDeniedException: /file-in-root-will-fail.txt)).

我如何通知上游阶段发生了这种情况?

4

0 回答 0