0

我正在使用 Hadoop+ELK 堆栈来构建分析堆栈。我正在尝试每天刷新索引。

我正在使用来自第三方的 CSV 格式的数据。我无法控制输入数据,即我无法要求更改 CSV 文件的架构。

问题是 CSV 记录中没有唯一 ID,甚至组合列以生成唯一 ID 也不起作用。因此,在刷新 Elasticsearch 时会将重复数据添加到索引中。

所以,如果第 1 天的数据是这样的

Product1,Language1,Date1,$1
Product2,Language2,Date1,$12

Day2 数据变为

Product1,Language1,Date1,$1
Product2,Language2,Date1,$12
Product1,Language1,Date1,$1
Product2,Language2,Date1,$12
Product3,Language1,Date2,$5(new record added on day2)

在 ELK 中是否有任何好的方法来处理这个问题。我正在使用 Logstash 来使用 csv 文件。

4

2 回答 2

3

我认为这与文档“_id”有关。

如果每个文档都有一个唯一的“_id”,则不会有问题,因为您只需将文档“更新”为相同的值。如果需要,您甚至可以将映射设置为不允许更新。

您的问题是您没有将文档的“_id”链接到文档的内容(在某些情况下这很好)。

我想一个简单的解决方案是创建自己的“my_id”字段并将“_id”的路径设置为它,就像这里一样。

那么问题就变成了如何创建那个“my_id”字段。我会在文档上使用哈希。

一个示例 python 片段将是(我相信你可以找到一个合适的 ruby​​ 插件):

import hashlib
hash_object = hashlib.sha1(b"Product2,Language2,Date1,$12")
hex_dig = hash_object.hexdigest()
print(hex_dig)
于 2015-07-15T13:08:15.210 回答
0

我相信解决方案的第一部分将是确定一组值,如果一起使用这些值对于文档来说将是唯一的。如果不是,则无法将重复文件与真实文件分开。为了便于讨论,假设四个值 (Product1,Language1,Date1,$1) 定义了一个文档。如果存在具有相同设置值的另一个文档,则它是先前文档的副本,而不是新文档。

假设您有 (Product1,Language1,Date1,$1),您可以先执行一个查询,搜索该文档是否已存在于 ElasticSearch 中。就像是:

{
"filter": {
    "bool": {
        "must": [
            {
                "term": {
                    "pdtField": "Product1"
                }
            },
            {
                "term": {
                    "langField": "Language1"
                }
            },
            {
                "term": {
                    "dateField": "Date1"
                }
            },
            {
                "term": {
                    "costField": "$1"
                }
            }
        ]
    }
}
}

根据您实际使用的任何内容,注意此处使用的字段的名称。如果此过滤器结果有,doc_count != 0那么您无需为此创建新文档。否则,使用现有值创建一个新文档。

或者,您可以使用从 (Product1,Language1,Date1,$1) 创建的散列创建文档 ID,然后将此散列用作文档的 _id。首先检查是否存在任何具有此_id 的文档。如果它不存在,则使用哈希生成的 _id 值创建一个新文档。

如果您无法控制创建单个文档的方式,那么也许您可以尝试使用上面建议的策略预处理您的 CSV 输入,只在 CSV 中保留所需的条目并摆脱休息,然后作为通常使用生成的 CSV。

于 2015-12-24T13:08:47.697 回答