事实证明,Alpakka 的 Elasticsearch 库也支持流形状,所以我可以让我的源代码通过它并通过任何实现未来的接收器运行它。Sink.foreach
在这里可以很好地用于测试目的,例如在https://github.com/danellis/akka-es-test中。
Flow fromFunction { product: Product =>
WriteMessage.createUpsertMessage(product.id, product.attributes)
} via ElasticsearchFlow.create[Map[String, String]](index, "_doc")
定义es.flow
然后
val graph = response.entity.withSizeLimit(MaxFeedSize).dataBytes
.via(scanner)
.via(CsvToMap.toMap(Utf8))
.map(attrs => Product(attrs("id").decodeString(Utf8), attrs.mapValues(_.decodeString(Utf8))))
.via(es.flow)
val futureDone = graph.runWith(Sink.foreach(println))
futureDone onComplete {
case Success(_) => println("Done")
case Failure(e) => println(e)
}