0

当我执行以下操作时,一切都在本地正常工作:

Cat data | mapper.py | sort | combiner.py | reducer.py  but when I ran this in Hadoop - combiner keeps on running without sending any output to reducer. Finally job gets killed.

显示"Java.io.IOException: Bad file descriptor" and "WARN org.apache.hadoop.streaming.PipeMapRed: java.io.IOException: Broken pipe"

这只发生在我使用组合器运行时,而不是当我使用映射器和减速器运行时。

映射器:

#!/usr/bin/python

import sys
import csv
import re

def mapper():
    reader = csv.reader(sys.stdin, delimiter='\t')



    for line in reader:

        if line[0] =="id":
            continue
        body = line[4].strip()
        ids =line[0]
        #print body
        body_split =re.split('\W+', body)



        for i in range(len(body_split)):
            p =re.compile('[a-z]+')
            if p.match(body_split[i].lower()):
                print "{0}\t{1}\t{2}".format(body_split[i].lower(), line[0],1)



mapper()

合路器:

enter code here
#!/usr/bin/python
import sys

oldKey =None 
old_doc =None
doc_count =0


for line in sys.stdin:
    data_mapped = line.strip().split("\t")
    if len(data_mapped) != 3:
        # Something has gone wrong. Skip this line.
        continue

    thisKey, thisDoc,nos = data_mapped

    if oldKey and oldKey != thisKey:
        print oldKey, "\t", old_doc,"\t",str(doc_count)
        doc_list = []
        doc_count =0
        old_doc=None

    oldKey = thisKey

    if old_doc ==None:
        old_doc =thisDoc
        doc_count +=int(1)
        continue
    if old_doc !=None:
        if thisDoc ==old_doc:
            doc_count +=int(1)
            continue
        else:
            print oldKey, "\t", old_doc,"\t",str(doc_count)
            old_doc =thisDoc
            doc_count =0
            doc_count +=int(1)

if oldKey != None:
    print oldKey, "\t", old_doc,"\t",str(doc_count)

减速器:

enter code here
#!/usr/bin/python
import sys

oldKey =None 
doc_list = []
doc_count =0


for line in sys.stdin:
    data_mapped = line.strip().split("\t")
    if len(data_mapped) != 3:
        # Something has gone wrong. Skip this line.
        continue

    thisKey, thisDoc,nos = data_mapped

    if oldKey and oldKey != thisKey:
        print oldKey, "\t", sorted(doc_list), "\t",doc_count
        doc_list = []
        doc_count =0
        oldKey=None

    oldKey = thisKey
    doc_count +=int(nos)
    if int(thisDoc) not in doc_list:
        doc_list.append(int(thisDoc))
        #print doc_list

if oldKey != None:
    print oldKey, "\t", sorted(doc_list), "\t",doc_count

问题类似于“倒排索引” - 最终输出将是 < word , [list of docs],count>

任何帮助都会很好。

4

0 回答 0