1

我有以下流的路径-

 kafkaStream[message] -> 
 kafkaStream[message] -> mergedKafkaStream[message] -> stream[EnrichedMessage] -> I/O
 kafkaStream[message] -> 

我不确定如何以 akka 流的方式编写它。我试过跟随(伪)。

KafkaStream extends ActorPublisher[message] {

}

IOHandler extends ActorSubscriber {

}

k1、k2、k3 是 kafka 流发布者

f = Flow[message].map(_.enrichMessage)

FlowGraph { b =>
  k1 ~> merge
  k2 ~> merge
  k3 ~> merge
  merge ~> f ~> ioHandlerSink
}

所以这就是我将发布者连接到接收器的方式。但是这里我要解决的问题是缓慢的 IO。IOHandler 演员处理消息的速度非常慢,所以我如何拥有多个 IOHandler 并且我应该能够分配任务。而且我还想保持背压,所以不要使用火,忘记使用路由器。

我对akka流很陌生,所以建议我一条出路。

谢谢

4

1 回答 1

-1

您可以IOHandler使用Balance.FlowGraph

于 2015-02-08T15:19:04.650 回答