我们有一个运行 CDH5.0.2 的四数据节点集群,通过 Cloudera Manager 包安装。为了将 13M 用户的行导入 HBase,我们编写了一个简单的 Python 脚本并使用了 hadoop-streaming jar。它可以按预期工作多达 100k 行。然后......然后,一个接一个地,所有数据节点都崩溃并显示相同的消息:
The health test result for REGION_SERVER_GC_DURATION has become bad:
Average time spent in garbage collection was 44.8 second(s) (74.60%)
per minute over the previous 5 minute(s).
Critical threshold: 60.00%.
任何按照网络上的建议(例如[1]、[2]、[3])解决问题的尝试都不会导致任何接近解决方案的地方。用 java 堆大小“玩”是没用的。唯一“解决”这种情况的是将区域服务器的垃圾收集持续时间监控周期从 5' 增加到 50'。可以说是一个肮脏的解决方法。
我们现在没有人力来为我们的 GC 使用情况创建监视器。我们最终会的,但我想知道将 13M 行导入 HBase 怎么可能导致所有区域服务器肯定崩溃。有干净的解决方案吗?
编辑:
Datanodes 上的 JVM 选项有:
-XX:+UseParNewGC -XX:+UseConcMarkSweepGC -XX:-CMSConcurrentMTEnabled -XX:CMSInitiatingOccupancyFraction=70 -XX:+CMSParallelRemarkEnabled
Datanodes 是运行 CentOS 6.5 的物理机器,每台都有 32Gb 内存和 2GHz 的 1Quadcore 和 30Mb 缓存。
下面是我们运行的 Python 脚本的摘录。我们填充了两个表:一个具有唯一用户 ID 作为行键,一个包含用户信息的列族,另一个具有我们可能希望作为行键访问的所有信息。
#!/usr/bin/env python2.7
import sys
import happybase
import json
connection = happybase.Connection(host=master_ip)
hbase_main_table = connection.table('users_table')
hbase_index_table = connection.table('users_index_table')
header = ['ID', 'COL1', 'COL2', 'COL3', 'COL4']
for line in sys.stdin:
l = line.replace('"','').strip("\n").split("\t")
if l[header.index("ID")] == "ID":
#you are reading the header
continue
for h in header[1:]:
try:
id = str(l[header.index("ID")])
col = 'info:' + h.lower()
val = l[header.index(h)].strip()
hbase_table.put(id_au_bytes, {
col: val
})
indexed = ['COL3', 'COL4']
for typ in indexed:
idx = l[header.index(typ)].strip()
if len(idx) == 0:
continue
row = hbase_index_table.row(idx)
old_ids = row.get('d:s')
if old_ids is not None:
ids = json.dumps(list(set(json.loads(old_ids)).union([id_au])))
else:
ids = json.dumps([id_au])
hbase_index.put(idx, {
'd:s': ids,
'd:t': typ,
'd:b': 'ame'
})
except:
msg = 'ERROR '+str(l[header.index("ID")])
logging.info(msg, exc_info=True)