我有一个必须在 hadoop 集群上运行的大规模日志处理问题。任务是将日志的每一行输入一个可执行的“cmd”并检查结果以决定是否保留这一行日志。
由于“cmd”程序打开了一个非常大的字典,我无法为日志的每一行调用该程序。我想保持它运行并向它提供所需的输入。我目前的解决方案使用 python 的 subprocess 模块,这里是代码:
import sys
from subprocess import Popen, PIPE
def main():
pp = Popen('./bqc/bqc/bqc_tool ./bqc/bqc/bqc_dict/ ./bqc/bqc/word_dict/ flag', shell=True, stdin=PIPE, stdout=PIPE, stderr=PIPE)
for line in sys.stdin:
lstr = line.strip().split('\t')
if len(lstr) != 7:
continue
pp.stdin.write('%s\n' % lstr[5])
pp.stdin.flush()
out = pp.stdout.readline()
lout = out.strip().split('\t')
if len(lout) == 3 and lout[1] == '401':
print line.strip()
if __name__ == '__main__':
main()
上面的代码在我的本地机器上测试时可以找到。它在将作业提交到 hadoop 时用作映射器。我没有使用减速器,以下是配置。
hadoop streaming \
-input /path_to_input \
-output /path_to_output \
-mapper "python/python2.7/bin/python27.sh ./mapper.py" \
-cacheArchive /path_to_python/python272.tar.gz#python \
-cacheArchive /path_to_cmd/bqc.tar.gz#bqc \
-file ./mapper.py \
-jobconf mapred.job.name="JobName" \
-jobconf mapred.job.priority=HIGH
bqc.tar.gz 中的文件如下所示:
bqc/
bqc/bqc_tool
bqc/bqc_dict/
bqc/word_dict/
在我看来,“-cacheArchive /path_to_cmd/bqc.tar.gz#bqc \”行应该提取tar文件并将它们提取到一个名为bqc的文件夹中。
但是当提交到 hadoop 集群时它会失败,并显示以下错误消息:
Traceback (most recent call last):
File "./mapper.py", line 19, in
main()
File "./mapper.py", line 11, in main
pp.stdin.write('%s\n' % lstr[5])
IOError: [Errno 32] Broken pipe
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:335)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:590)
at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:152)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:54)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:18)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:388)
at org.apache.hadoop.mapred.Child.main(Child.java:194)
java.lang.RuntimeException: PipeMapRed.waitOutputThreads(): subprocess failed with code 1
at org.apache.hadoop.streaming.PipeMapRed.waitOutputThreads(PipeMapRed.java:335)
at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:590)
at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:163)
at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:61)
at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:18)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:388)
at org.apache.hadoop.mapred.Child.main(Child.java:194)
有人知道吗?任何帮助,将不胜感激!
谢谢!
扎卡里