4

我已经使用 Akka Streams 和 Alpakka 编写了一些代码,这些代码从 Amazon SQS 读取并索引 Elasticsearch 中的事件。一切都很顺利,性能也很棒,但我对索引名称有疑问。我有这个代码:

class ElasticSearchIndexFlow(restClient: RestClient) {

  private val elasticSettings = ElasticsearchSinkSettings(bufferSize = 10)

  def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
    ElasticsearchFlow.create[DomainEvent](index, "domain-event", elasticSettings)(
      restClient,
      DomainEventMarshaller.domainEventWrites
    )

  private def index = {
    val now = DateTime.now()
    s"de-${now.getYear}.${now.getMonthOfYear}.${now.getDayOfMonth}"
  }
}

问题是在运行流程几天后,索引名称没有改变。我想 Akka Streams 在幕后创建了一个融合的演员,并且index获取索引名称的函数仅在执行开始时进行评估。

知道如何根据当前日期使用索引名称对 ES 中的事件进行索引吗?

4

1 回答 1

0

该问题的解决方案是在上一步中设置索引名称IncomingMessage.withIndexName

所以:

def flow: Flow[(DomainEvent, Message), IncomingMessage[DomainEvent, Message], NotUsed] =
  Flow[(DomainEvent, Message)].map {
    case (domainEvent, message) =>
      IncomingMessage(Some(domainEvent.eventId), domainEvent, message)
        .withIndexName(indexName(domainEvent.ocurredOn))
}

和:

def flow: Flow[IncomingMessage[DomainEvent, NotUsed], Seq[IncomingMessageResult[DomainEvent, NotUsed]], NotUsed] =
  ElasticsearchFlow.create[DomainEvent]("this-index-name-is-not-used", "domain-event", elasticSettings)(
    restClient,
    DomainEventMarshaller.domainEventWrites
  )
于 2018-04-19T08:07:05.443 回答