1

我有一个 Alpakka Elasticsearch Sink,我在请求之间保留了它。当我收到请求时,我Source从 HTTP 请求创建一个并将其转换Source为 ElasticsearchWriteMessage的一个,然后使用mySource.runWith(theElasticseachSink).

  1. 来源完成后如何收到通知?似乎没有什么有用的东西被实现。
  2. 源的完成是否会传递给接收器,这意味着我每次都必须创建一个新的?
  3. 如果以上是肯定的,会在Flow.fromSourceAndSink帮助下以某种方式将它们解耦吗?

我的目标是知道 HTTP 下载何时完成(包括via它所经过的 s)并能够重用接收器。

4

2 回答 2

0

您可以随意传递流程的单个部分,甚至可以传递整个可执行图(这些是不可变的)。run() 调用具体化流程,但不会更改您的图形或其部分。

1)既然您想知道 HttpDownload 何时通过流程,为什么不使用完整图 Future[Done] ?假设您对 elasticsearch 的调用是异步的,这应该是相等的,因为您的接收器只是触发调用而不等待。您还可以使用 Source.queue ( https://doc.akka.io/docs/akka/2.5/stream/operators/Source/queue.html ) 并将您的消息添加到队列中,然后重用定义的图表您可以在需要处理时添加新消息。这也实现了 SourceQueueWithComplete 允许您停止流。除此之外,在需要的地方重用接收器,而无需等待另一个流使用它。

2)如上所述:不,您不需要多次实例化接收器。

最好的问候, 安迪

于 2019-02-02T15:40:46.537 回答
0

事实证明,Alpakka 的 Elasticsearch 库也支持流形状,所以我可以让我的源代码通过它并通过任何实现未来的接收器运行它。Sink.foreach在这里可以很好地用于测试目的,例如在https://github.com/danellis/akka-es-test中。

Flow fromFunction { product: Product =>
    WriteMessage.createUpsertMessage(product.id, product.attributes)
} via ElasticsearchFlow.create[Map[String, String]](index, "_doc")

定义es.flow然后

val graph = response.entity.withSizeLimit(MaxFeedSize).dataBytes
    .via(scanner)
    .via(CsvToMap.toMap(Utf8))
    .map(attrs => Product(attrs("id").decodeString(Utf8), attrs.mapValues(_.decodeString(Utf8))))
    .via(es.flow)

val futureDone = graph.runWith(Sink.foreach(println))

futureDone onComplete {
    case Success(_) => println("Done")
    case Failure(e) => println(e)
}
于 2019-02-03T06:38:07.360 回答