0

源(从kafka读取)->keyby()-> window()-> transform(agg)-> dbSink(提交到数据库)。这工作正常。

现在我有一个用例,我需要将提交给 DB 的数据写入另一个 kafka 主题以进行一些事件处理。如果提交到数据库失败,这些记录将被删除。我需要的是:source(从kafka读取)->keyby()-> window()-> transform(agg)-> dbSink(提交到数据库)->keyby()->window()->kafkaSink() .

接收器运营商不支持 Sideouputs。我能想到的一件事:将 dbSink 转换为 flatMap()。在此 CustomDbFlatMap 中,提交到存储库,然后添加到收集器。这是实现这一目标的唯一方法吗?

4

0 回答 0