我需要使用以下接口创建一个函数:
import akka.kafka.scaladsl.Consumer.Control
object ItemConversionFlow {
def build(config: StreamConfig): Flow[Item, OtherItem, Control] = {
// Implementation goes here
}
我的问题是我不知道如何以适合上述界面的方式定义流程。
当我做这样的事情时
val flow = Flow[Item]
.map(item => doConversion(item)
.filter(_.isDefined)
.map(_.get)
结果类型为 Flow[Item, OtherItem, NotUsed]。到目前为止,我还没有在 Akka 文档中找到任何内容。此外,akka.stream.scaladsl.Flow 上的功能仅提供“未使用”而不是控制。如果有人能指出我正确的方向,那就太好了。
一些背景知识:我需要设置几个仅在转换部分进行区分的管道。这些管道是主流的子流,可能由于某种原因而停止(相应的消息到达某个 kafka 主题)。因此我需要控制部分。我们的想法是创建一个 Graph 模板,我只需在其中插入提到的流作为参数(返回它的工厂)。对于特定情况,我们有一个可行的解决方案。为了概括它,我需要这种流程。