0

我正在尝试将日志转发到弹性搜索并陷入动态设置索引(按输入数据中的字段)。

我的输入数据格式是 JSON 并且总是有键“es_idx”。我希望通过该键转发到elasticsearch并添加时间戳,我使用logstash_formattrue来实现时间戳功能并logstash_prefix设置“fluentd”以外的索引名称

这就是我的流利配置的样子:

# fluentd/conf/fluent.conf
<source>
  type stdin
  # Input pattern. It depends on Parser plugin
  format json

  # Optional. default is stdin.events
</source>

<match *.**>
  @type copy
  <store>
    @type stdout
  </store>
  <store>
    @type elasticsearch
    host <es-host>
    port<es-port>
    logstash_format true
    logstash_prefix ${$record["es_idx"]}
    type_name fluentd
    flush_interval 5s
  </store>

</match>

使用以下输入 {"tenant_id":"test","es_idx":"blabla"} 时,我收到以下错误:

2020-05-27 10:38:06 +0300 [warn]: #0 dump an error event: error_class=Fluent::Plugin::ElasticsearchErrorHandler::ElasticsearchError error="400 - Rejected by Elasticsearch" location=nil tag="stdin.events" time=2020-05-27 10:37:59.498450000 +0300 record={"tenant_id"=>"test", "es_idx"=>"blabla"}

如果我将 logstash_pattern 设置为像这样的其他字符串:“logstash_pattern blabla”,它可以正常工作。

有谁知道可能是什么问题?

4

2 回答 2

3

要使用动态弹性搜索,您需要使用此处描述的块键 在您的情况下,您可能需要这样的配置

<match *.**>
  @type copy
  <store>
    @type stdout
  </store>
  <store>
    @type elasticsearch
    host <es-host>
    port<es-port>
    logstash_format true
    logstash_prefix ${es_idx}
    logstash_dateformat %Y%m%d
    type_name fluentd
    flush_interval 5s

    <buffer es_idx>
      @type file
      path /fluentd/log/elastic-buffer
      flush_thread_count 8
      flush_interval 1s
      chunk_limit_size 32M
      queue_limit_length 4
      flush_mode interval
      retry_max_interval 30
      retry_forever true
    </buffer>
  </store>
</match>

另一种选择是使用 elasticsearch_dynamic

<match my.logs.*>
  @type elasticsearch_dynamic
  hosts ${record['host1']}:9200,${record['host2']}:9200
  index_name my_index.${Time.at(time).getutc.strftime(@logstash_dateformat)}
  logstash_prefix ${tag_parts[3]}
  port ${9200+rand(4)}
  index_name ${tag_parts[2]}-${Time.at(time).getutc.strftime(@logstash_dateformat)}
</match>
于 2020-06-05T20:22:42.127 回答
-1

成功从记录对象中获取值,如下所示:

<match *.**>
  @type copy
  <store>
    @type stdout
  </store>
  <store>
    @type elasticsearch
    @log_level debug
    host <host>
    logstash_format true
    logstash_prefix ${es_index_pattern}
    type_name fluentd
    flush_interval 5s
    <buffer tag, es_index_pattern>
      @type memory
    </buffer>
  </store>
</match>
于 2020-06-08T09:56:55.970 回答