我已经使用 Akka Streams 和 Alpakka 编写了一些代码,这些代码从 Amazon SQS 读取并索引 Elasticsearch 中的事件。一切都很顺利,性能也很棒,但我对索引名称有疑问。我有这个代码:
class ElasticSearchIndexFlow(restClient: RestClient) {
private val elasticSettings = ElasticsearchSinkSettings(bufferSize = 10)
def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
ElasticsearchFlow.create[DomainEvent](index, "domain-event", elasticSettings)(
restClient,
DomainEventMarshaller.domainEventWrites
)
private def index = {
val now = DateTime.now()
s"de-${now.getYear}.${now.getMonthOfYear}.${now.getDayOfMonth}"
}
}
问题是在运行流程几天后,索引名称没有改变。我想 Akka Streams 在幕后创建了一个融合的演员,并且index
获取索引名称的函数仅在执行开始时进行评估。
知道如何根据当前日期使用索引名称对 ES 中的事件进行索引吗?