2

我是弹性搜索域的新手。我正在学习并尝试它是否满足我的需求。

现在我正在学习elasticsearch中的聚合,我编写了以下python脚本来将一些时间序列数据摄取到elasticsearch中。

每 5 秒我创建一条新消息,其中包含:

  1. 时间戳(ISO8601 格式)
  2. 柜台
  3. 0 到 100 之间的随机数

对于每个新的一天,我都会创建一个新索引,logs_Y-m-D并将其作为索引名称。

我将使用消息Counter作为_id. 计数器为每个新索引(每天)重置。


import csv
import time
import random
from datetime import datetime
from elasticsearch import Elasticsearch


class ElasticSearchDB:
    def __init__(self):
        self.es = Elasticsearch()

    def run(self):
        print("Started: {}".format(datetime.now().isoformat()))
        print("<Ctrl + c> for exit!")

        with open("..\\out\\logs.csv", "w", newline='') as f:
            writer = csv.writer(f)
            counter = 0
            try:
                while True:
                    i_name = "logs_" + time.strftime("%Y-%m-%d")
                    if not self.es.indices.exists([i_name]):
                        self.es.indices.create(i_name, ignore=400)
                        print("New index created: {}".format(i_name))
                        counter = 0

                    message = {"counter": counter, "@timestamp": datetime.now().isoformat(), "value": random.randint(0, 100)}
                    # Write to file
                    writer.writerow(message.values())
                    # Write to elasticsearch index
                    self.es.index(index=i_name, doc_type="logs", id=counter, body=message)
                    # Waste some time
                    time.sleep(5)
                    counter += 1

            except KeyboardInterrupt:
                print("Stopped: {}".format(datetime.now().isoformat()))


test_es = ElasticSearchDB()
test_es.run()

我运行了这个脚本30 分钟。接下来,使用 Sense,我使用以下聚合查询来查询 elasticsearch。

查询 #1:获取所有

查询 #2:汇总过去 1 小时的日志并为其生成统计信息。这显示了正确的结果。

查询 #3:聚合过去 1 分钟的日志并为其生成统计信息。聚合的文档数量与 1 小时聚合中的相同,理想情况下,它应该只聚合12-13 个日志

查询 #4:聚合过去 15 秒的日志并为其生成统计信息。聚合的文档数量与 1 小时聚合中的相同,理想情况下,它应该只聚合3-4 个日志

我的问题:

  1. 为什么 elasticsearch 无法理解 1 分 15 秒的范围?
  2. 我了解映射,但我不知道如何写一个,所以我没有写一个,这是导致这个问题的原因吗?

请帮忙!


查询 #1:获取所有

GET /_search

输出:

{
   "took": 3,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 314,
      "max_score": 1,
      "hits": [
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "19",
            "_score": 1,
            "_source": {
               "counter": 19,
               "value": 62,
               "@timestamp": "2016-11-03T07:40:35.981395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "22",
            "_score": 1,
            "_source": {
               "counter": 22,
               "value": 95,
               "@timestamp": "2016-11-03T07:40:51.066395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "25",
            "_score": 1,
            "_source": {
               "counter": 25,
               "value": 18,
               "@timestamp": "2016-11-03T07:41:06.140395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "26",
            "_score": 1,
            "_source": {
               "counter": 26,
               "value": 58,
               "@timestamp": "2016-11-03T07:41:11.164395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "29",
            "_score": 1,
            "_source": {
               "counter": 29,
               "value": 73,
               "@timestamp": "2016-11-03T07:41:26.214395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "41",
            "_score": 1,
            "_source": {
               "counter": 41,
               "value": 59,
               "@timestamp": "2016-11-03T07:42:26.517395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "14",
            "_score": 1,
            "_source": {
               "counter": 14,
               "value": 9,
               "@timestamp": "2016-11-03T07:40:10.857395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "40",
            "_score": 1,
            "_source": {
               "counter": 40,
               "value": 9,
               "@timestamp": "2016-11-03T07:42:21.498395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "24",
            "_score": 1,
            "_source": {
               "counter": 24,
               "value": 41,
               "@timestamp": "2016-11-03T07:41:01.115395"
            }
         },
         {
            "_index": "logs_2016-11-03",
            "_type": "logs",
            "_id": "0",
            "_score": 1,
            "_source": {
               "counter": 0,
               "value": 79,
               "@timestamp": "2016-11-03T07:39:00.302395"
            }
         }
      ]
   }
}

查询 #2:获取过去 1 小时的统计信息。

GET /logs_2016-11-03/logs/_search?search_type=count
{
    "aggs": {
        "time_range": {
            "filter": {
                "range": {
                    "@timestamp": {
                        "from": "now-1h"
                    }
                }
            },
            "aggs": {
                "just_stats": {
                    "stats": {
                        "field": "value"
                    }
                }
            }
        }
    }
}

输出:

{
   "took": 5,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 366,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "time_range": {
         "doc_count": 366,
         "just_stats": {
            "count": 366,
            "min": 0,
            "max": 100,
            "avg": 53.17213114754098,
            "sum": 19461
         }
      }
   }
}

我得到 366 个条目,这是正确的。

查询 #3:获取最近 1 分钟的统计信息。

GET /logs_2016-11-03/logs/_search?search_type=count
{
    "aggs": {
        "time_range": {
            "filter": {
                "range": {
                    "@timestamp": {
                        "from": "now-1m"
                    }
                }
            },
            "aggs": {
                "just_stats": {
                    "stats": {
                        "field": "value"
                    }
                }
            }
        }
    }
}

输出:

{
   "took": 15,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 407,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "time_range": {
         "doc_count": 407,
         "just_stats": {
            "count": 407,
            "min": 0,
            "max": 100,
            "avg": 53.152334152334156,
            "sum": 21633
         }
      }
   }
}

这是错误的,它不能是最近 1 分钟内的 407 个条目,它应该只有 12-13 个日志。

查询 #4:获取过去 15 秒的统计数据。

GET /logs_2016-11-03/logs/_search?search_type=count
{
    "aggs": {
        "time_range": {
            "filter": {
                "range": {
                    "@timestamp": {
                        "from": "now-15s"
                    }
                }
            },
            "aggs": {
                "just_stats": {
                    "stats": {
                        "field": "value"
                    }
                }
            }
        }
    }
}

输出:

{
   "took": 15,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 407,
      "max_score": 0,
      "hits": []
   },
   "aggregations": {
      "time_range": {
         "doc_count": 407,
         "just_stats": {
            "count": 407,
            "min": 0,
            "max": 100,
            "avg": 53.152334152334156,
            "sum": 21633
         }
      }
   }
}

这也是错误的,它不能是最近 15 秒内的 407 个条目。它应该只有 3-4 个日志。

4

1 回答 1

2

您的查询是正确的,但 ES 将日期存储在 UTC 中,因此您可以取回所有内容。从文档

在 JSON 文档中,日期表示为字符串。Elasticsearch 使用一组预配置的格式来识别这些字符串并将其解析为一个 long 值,该值表示UTC中的毫秒数 。

您可以pytz在 ES 中使用该模块并以 UTC 格式存储日期。请参阅SO 问题。

您也可以time_zone在范围查询中使用参数,而且最好聚合过滤结果而不是获取所有结果然后过滤所有结果。

GET /logs_2016-11-03/logs/_search
{
  "query": {
    "bool": {
      "filter": {
        "range": {
          "@timestamp": {
            "gte": "2016-11-03T07:15:35",         <----- You would need absolute value
            "time_zone": "-01:00"              <---- timezone setting
          }
        }
      }
    }
  },
  "aggs": {
    "just_stats": {
      "stats": {
        "field": "value"
      }
    }
  },
  "size": 0
}

您必须将所需的时间(now-1m,now-15syyyy-MM-dd'T'HH:mm:ss )转换为 time_zone 参数的格式,因为now它不受此影响,time_zone因此最好的选择是将日期转换为 UTC 并存储它。

于 2016-11-03T05:32:42.400 回答