3

我是 akka 流的新手。我使用 kafka 作为源(使用 ReactiveKafka 库)并通过流对数据进行一些处理,并使用订阅者(EsHandler)作为接收器。

现在我需要处理错误并通过错误处理程序将其推送到不同的 kafka 队列。我正在尝试将 EsHandler 用作发布者和订阅者。我不确定如何将 EsHandler 作为中间人而不是接收器。

这是我的代码:

val publisher = Kafka.kafka.consume(topic, "es", new StringDecoder())

val flow = Flow[String].map { elem => JsonConverter.convert(elem.toString()) }

val sink = Sink.actorSubscriber[GenModel](Props(classOf[EsHandler]))

Source(publisher).via(flow).to(sink).run()


class EsHandler extends ActorSubscriber with ActorPublisher[Model] {

  val requestStrategy = WatermarkRequestStrategy(100)

  def receive = {
    case OnNext(msg: Model) =>
      context.actorOf(Props(classOf[EsStorage], self)) ! msg

    case OnError(err: Exception) =>
      context.stop(self)

    case OnComplete =>
      context.stop(self)

    case Response(msg) =>
      if (msg.isError()) onNext(msg.getContent())
  }
}

class ErrorHandler extends ActorSubscriber {

  val requestStrategy = WatermarkRequestStrategy(100)

  def receive = {
    case OnNext(msg: Model) =>
      println(msg)
  }
}
4

1 回答 1

3

我们强烈建议不要实现你自己的处理器(这是反应流规范给“订阅者&&发布者”的名称。很难做到正确,这就是为什么没有发布者直接作为辅助特征公开的原因。

相反,大多数时候您会希望使用提供给您的Sources/ Sinks(或Publishers/ Subscribers)并在它们之间运行您的操作,例如 map/filter 等步骤。

实际上,您可以使用现有的 Kafka Sources 和 Sinks 实现,它称为reactive-kafka并由Reactive Streams TCK验证,因此您可以相信它是有效的实现。

于 2015-07-07T15:19:05.663 回答