3

我正在使用Akka Streams Kafka将 Kafka 消息通过管道传输到远程服务。我想保证该服务只接收一次每条消息(至少一次和最多一次传递)。

这是我想出的代码:

  private def startFlow[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
                           topicPattern: String,
                           mapCommittableMessageToSinkMessage: Function[CommittableMessage[String, String], T]) {

    val groupId = config.getString("group-id")

    implicit val materializer = ActorMaterializer()

    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withGroupId(groupId)
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
    import system.dispatcher // the ExecutionContext that will be used in ask call below

    Consumer.committableSource(consumerSettings, Subscriptions
      .topicPattern(topicPattern))
      .map(message => (message, mapCommittableMessageToSinkMessage(message)))
      .mapAsync(1)(tuple => ask(subscriber, tuple._2).map(_ => tuple._1))
      .mapAsync(1)(message => message.committableOffset.commitScaladsl())
      .runWith(Sink.ignore)
  }

如代码所示,它映射原始消息的元组,以及传递给订阅者(发送到远程服务的参与者)的转换消息。元组的目的是在订阅者完成处理后提交偏移量。

关于它的某些东西似乎是一种反模式,但我不确定是否有更好的方法来做到这一点。有什么更好的方法建议吗?

谢谢!

4

2 回答 2

3

保持这种更清洁和更容易更改的一种方法是使用GraphDSL。它将允许您生成图形的一个分支来承载Committable消息的一部分,而另一个分支可以执行所有需要的业务逻辑。

图的一个例子可能是(为了更清楚,省略了所有的样板):

val src = Consumer.committableSource(consumerSettings, Subscriptions
      .topicPattern(topicPattern))

val businessLogic = Flow[CommittableMessage[String, String]].mapAsync(1)(message => ask(subscriber, mapCommittableMessageToSinkMessage(message)))

val snk = Flow[CommittableMessage[String, String]].mapAsync(1)(message => message.committableOffset.commitScaladsl())
      .runWith(Sink.ignore)  // look into Sink.foldAsync for a more compact re-write of this part

src ~> broadcast
       broadcast ~> businessLogic ~> zip.in0
       broadcast         ~>          zip.in1
                                     zip.out.map(_._2) ~> snk
于 2017-03-06T21:19:18.553 回答
2

这是在上面的答案中使用@stefano-bonetti 方法工作的完整代码:

  private def startStream[T](implicit system: ActorSystem, config: Config, subscriber: ActorRef,
                             topicSuffix: String,
                             convertCommittableMessageToSubscriberMessage: Function[CommittableMessage[String, String], T]) {

    val groupId = config.getString("group-id")
    val subscriberName = subscriber.path.name
    val customerId = config.getString("customer-id")
    val topicPattern = s"^$customerId\\.$topicSuffix$$"

    implicit val materializer = ActorMaterializer()

    val consumerSettings = ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withGroupId(s"$groupId.$subscriberName")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

    implicit val timeout = Timeout(5 seconds) // timeout for reply message on ask call below
    import system.dispatcher // the ExecutionContext that will be used in ask call below

    val src = Consumer.committableSource(consumerSettings, Subscriptions.topicPattern(topicPattern))

    val businessLogic = Flow[CommittableMessage[String, String]]
      .mapAsync(1)(message => subscriber.ask(convertCommittableMessageToSubscriberMessage(message)))

    val snk = Flow[CommittableMessage[String, String]]
      .mapAsync(1)(message => message.committableOffset.commitScaladsl())
      .to(Sink.ignore)

    val decider: Supervision.Decider = {
      case e => {
        system.log.error("error in stream", e)
        Supervision.Stop
      }
    }

    val g = RunnableGraph.fromGraph(GraphDSL.create() { implicit builder: GraphDSL.Builder[NotUsed] =>
      import GraphDSL.Implicits._

      val broadcast = builder.add(Broadcast[CommittableMessage[String, String]](2))
      val zip = builder.add(Zip[Any, CommittableMessage[String, String]])

      src ~> broadcast
      broadcast ~> businessLogic ~> zip.in0
      broadcast ~> zip.in1
      zip.out.map(_._2) ~> snk

      ClosedShape
    })
      .withAttributes(ActorAttributes.supervisionStrategy(decider))
      .run(materializer)
  }
于 2017-03-06T23:32:49.933 回答