3

我正在尝试将数据从配置单元表中插入弹性搜索。

    CREATE EXTERNAL TABLE IF NOT EXISTS es_temp_table (
        dt    STRING,
        text  STRING
    )
    STORED BY 'org.elasticsearch.hadoop.hive.EsStorageHandler'
    TBLPROPERTIES('es.resource'='aggr_2014-10-01/metric','es.index.auto.create'='true')
    ;

    INSERT OVERWRITE TABLE es_temp_table
    SELECT dt, description
    FROM other_table

但是,数据已关闭。当我在我的另一张桌子上做一个计数(*)时,我得到了 6,000 行。当我搜索 aggr_2014-10-01 索引时,我看到了 10,000 条记录!不知何故,记录被复制(行被复制多次)。也许我可以删除弹性搜索中的重复记录?不知道我会怎么做。

我相信这可能是 Hive/Qubole 为每个映射生成两个任务的结果。如果一个映射器成功,它会尝试杀死另一个映射器。然而,另一个任务已经造成了损害(也就是插入到 ElasticSearch 中)。这是我最好的猜测,但我更愿意知道确切的原因以及是否有办法解决它。

set mapred.map.tasks.speculative.execution=false;

我发现的一件事是将推测执行设置为 false,以便每个映射器只生成一个任务(参见上面的设置)。但是,现在我看到低估了。我相信这可能是由于记录被跳过,但我无法诊断为什么这些记录会首先被跳过。

在这个版本中,这也意味着即使一个任务/映射器失败,整个作业也会失败,然后我需要删除索引(部分数据已上传)并重新运行整个作业(大约需要 4 小时)。

[进度更新]

我试图通过将所有工作放在减速器中来解决这个问题(这是只产生一个任务以确保没有重复记录插入的唯一方法)。

INSERT OVERWRITE TABLE es_temp_table
SELECT dt, description
FROM other_table
DISTRIBUTE BY cast(rand()*250 as int);

然而,我现在看到了一个巨大的低估!现在只有 2,000 条记录。弹性搜索确实估计了一些东西,但没有达到这个程度。ElasticSearch 中的记录只是较少。这可能是由于任务失败(不再重试)。它可能是从 Qubole/Hive 传递格式错误的条目时开始的。但我设置:

set mapreduce.map.skip.maxrecords=1000;

以下是我的查询的其他一些设置:

set es.nodes=node-names
set es.port=9200;
set es.bulk.size.bytes=1000mb;
set es.http.timeout=20m;
set mapred.tasktracker.expiry.interval=3600000;
set mapred.task.timeout=3600000;
4

1 回答 1

1

我确定了问题。正如我所怀疑的那样,插入跳过了一些被认为是“坏”的记录。我永远无法找到确切被跳过的记录,但我尝试用空格替换所有非字母数字字符。这解决了问题!不再跳过记录,所有数据都上传到 Elastic Search。

INSERT OVERWRITE TABLE es_temp_table
SELECT dt, REGEXP_REPLACE(description, '[^0-9a-zA-Z]+', ' ')
FROM other_table
于 2015-02-20T19:23:34.213 回答