0

我目前正在构建一个将数据从 mongoDb 流式传输到 elasticsearch 的解决方案。我的目标是跟踪所有成功传输到 elasticsearch 的项目。我正在使用 akka-streams 和 elastic4s。目前流入 es 的样子是这样的

val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
    batchSize = batchSize,
    completionFn = { () => elasticFinishPromise.success(()); ()},
    errorFn = { (t: Throwable) => elasticFinishPromise.failure(t); ()},
    concurrentRequests = concurrentRequests
    )
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber)

从我的来源来看是这样的:

val a: [NotUsed] = mongoSrc
  .via(some operations..)
  .to(esSink)
  .run()

现在一切正常,现在我正在使用第二个接收器记录例如项目计数。但我宁愿记录真正传输到elasticsearch的项目。elastic4s 订阅者提供了一个listener: ResponseListenerwithonAck(): Unit并且onFailure(): Unit我很想像这样将这些信息返回到流中

val mongoSrc: [Source..]
val doStuff: [Flow..]
val esSink: [Flow..] //now as flow instead of sink
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item

mongoSrc ~> doStuff ~> esSink ~> logSink

我将如何实施?我是否需要一个自定义阶段来缓冲 和 的onAck元素onFailure?或者有没有更简单的方法?

谢谢你的帮助。

4

1 回答 1

1

您可以Subscriber[T]通过利用Flow.fromSinkAndSource. 查看文档中的“复合流(来自 Sink 和 Source)”插图。

在这种情况下,您将附加您的自定义 actorPublisher 作为源并从onAck().

由于您要求更简单的方法:

val doStuff = Flow[DocToIndex]
                .grouped(batchSize)
                .mapAsync(concurrentRequests)(bulkopFuture)

简而言之,除了所有有用的抽象之外,elastic4s 订阅者只是一个批量更新请求

于 2016-07-30T17:38:32.847 回答