我有以下流的路径-
kafkaStream[message] ->
kafkaStream[message] -> mergedKafkaStream[message] -> stream[EnrichedMessage] -> I/O
kafkaStream[message] ->
我不确定如何以 akka 流的方式编写它。我试过跟随(伪)。
KafkaStream extends ActorPublisher[message] {
}
IOHandler extends ActorSubscriber {
}
k1、k2、k3 是 kafka 流发布者
f = Flow[message].map(_.enrichMessage)
FlowGraph { b =>
k1 ~> merge
k2 ~> merge
k3 ~> merge
merge ~> f ~> ioHandlerSink
}
所以这就是我将发布者连接到接收器的方式。但是这里我要解决的问题是缓慢的 IO。IOHandler 演员处理消息的速度非常慢,所以我如何拥有多个 IOHandler 并且我应该能够分配任务。而且我还想保持背压,所以不要使用火,忘记使用路由器。
我对akka流很陌生,所以建议我一条出路。
谢谢