我目前正在构建一个将数据从 mongoDb 流式传输到 elasticsearch 的解决方案。我的目标是跟踪所有成功传输到 elasticsearch 的项目。我正在使用 akka-streams 和 elastic4s。目前流入 es 的样子是这样的
val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
batchSize = batchSize,
completionFn = { () => elasticFinishPromise.success(()); ()},
errorFn = { (t: Throwable) => elasticFinishPromise.failure(t); ()},
concurrentRequests = concurrentRequests
)
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber)
从我的来源来看是这样的:
val a: [NotUsed] = mongoSrc
.via(some operations..)
.to(esSink)
.run()
现在一切正常,现在我正在使用第二个接收器记录例如项目计数。但我宁愿记录真正传输到elasticsearch的项目。elastic4s 订阅者提供了一个listener: ResponseListener
withonAck(): Unit
并且onFailure(): Unit
我很想像这样将这些信息返回到流中
val mongoSrc: [Source..]
val doStuff: [Flow..]
val esSink: [Flow..] //now as flow instead of sink
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item
mongoSrc ~> doStuff ~> esSink ~> logSink
我将如何实施?我是否需要一个自定义阶段来缓冲 和 的onAck
元素onFailure
?或者有没有更简单的方法?
谢谢你的帮助。