2

Is there a direct way to address the following error or overall a better way to use Hive to get the join that I need? Output to a stored table isn't a requirement as I can be content with an INSERT OVERWRITE LOCAL DIRECTORY to a csv.

I am trying to perform the following cross join. ipint is a 9GB table, and geoiplite is 270MB.

CREATE TABLE iplatlong_sample AS
SELECT ipintegers.networkinteger, geoiplite.latitude, geoiplite.longitude
FROM geoiplite
CROSS JOIN ipintegers
WHERE ipintegers.networkinteger >= geoiplite.network_start_integer AND ipintegers.networkinteger <= geoiplite.network_last_integer;

I use CROSS JOIN on ipintegers instead of geoiplite because I have read that the rule is for the smaller table to be on the left, larger on the right.

Map and Reduce stages complete to 100% according to HIVE, but then

2015-08-01 04:45:36,947 Stage-1 map = 100%, reduce = 100%, Cumulative CPU 8767.09 sec

MapReduce Total cumulative CPU time: 0 days 2 hours 26 minutes 7 seconds 90 msec

Ended Job = job_201508010407_0001

Stage-8 is selected by condition resolver.

Execution log at: /tmp/myuser/.log

2015-08-01 04:45:38 Starting to launch local task to process map join; maximum memory = 12221153280

Execution failed with exit status: 3

Obtaining error information

Task failed!

Task ID: Stage-8

Logs:

/tmp/myuser/hive.log

FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask

MapReduce Jobs Launched: Job 0: Map: 38 Reduce: 1 Cumulative CPU: 8767.09 sec
HDFS Read: 9438495086 HDFS Write: 8575548486 SUCCESS

My hive config:

SET hive.mapred.local.mem=40960;
SET hive.exec.parallel=true;
SET hive.exec.compress.output=true;
SET hive.exec.compress.intermediate = true;
SET hive.optimize.skewjoin = true;
SET mapred.compress.map.output=true;
SET hive.stats.autogather=false;

I have varied SET hive.auto.convert.join between true and false but with the same result.

Here are the errors in the output log from /tmp/myuser/hive.log

$ tail -12 -f tmp/mysyer/hive.log

2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Execution failed with exit status: 3
2015-08-01 07:30:46,086 ERROR exec.Task (SessionState.java:printError(419)) - Obtaining error information
2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) -
Task failed!
Task ID:
  Stage-8

Logs:

2015-08-01 07:30:46,087 ERROR exec.Task (SessionState.java:printError(419)) - /tmp/myuser/hive.log
2015-08-01 07:30:46,087 ERROR mr.MapredLocalTask (MapredLocalTask.java:execute(268)) - Execution failed with exit status: 3
2015-08-01 07:30:46,094 ERROR ql.Driver (SessionState.java:printError(419)) - FAILED: Execution Error, return code 3 from org.apache.hadoop.hive.ql.exec.mr.MapredLocalTask

I am running the hive client on the Master, a Google Cloud Platform instance of type n1-highmem-8 type (8 CPU, 52GB) and workers are n1-highmem-4 (4CPU 26GB), but I suspect after MAP and REDUCE that a local join (as implied) takes place on the Master. Regardless, in bdutils I configured the JAVAOPTS for the worker nodes (n1-highmem-4) to: n1-highmem-4

SOLUTION EDIT: The solution is to organize the data the range data into a range tree.

4

1 回答 1

1

我认为不可能执行这种交叉连接蛮力 - 只需将行号相乘,这有点失控。你需要一些优化,我认为 hive 还没有能力。

但是这个问题实际上是否可以在 O(N1+N2) 时间内解决,前提是您已对数据进行排序(hive 可以为您完成) - 您只需同时浏览两个列表,在每一步获取一个 ip 整数,看看是否有间隔从这个整数开始,添加它们,删除那些结束的,发出匹配的元组,等等。伪代码:

intervals=[]
ipintegers = iterator(ipintegers_sorted_file)
intervals = iterator(intervals_sorted_on_start_file)
for x in ipintegers:
    intervals = [i for i in intervals if i.end >= x]

    while(intervals.current.start<=x):
        intervals.append(intervals.current)
        intervals.next()
    for i in intervals:
        output_match(i, x)

现在,如果您有一个外部脚本/UDF 函数,它知道如何读取较小的表并获取 ip 整数作为输入并将匹配的元组作为输出,您可以使用 hive 并将SELECT TRANSFORM输入流式传输到它。

或者你可以只在具有两个输入文件的本地机器上运行这个算法,因为这只是 O(N),甚至 9 GB 的数据也是非常可行的。

于 2015-08-01T19:41:51.707 回答