0

我正在尝试使用 mrjob 和 Python 对 MongoDB 数据库进行地图缩减。mongodb-hadoop 连接器有关于如何使用 AWS EMR 但不与 mrjob 一起使用的示例,我并没有把所有的东西都放在一起。就 mrjob.conf 而言,这是我已经拥有的:

    enable_emr_debugging: true
    ami_version: 3.0.4
    interpreter: python2.7

    upload_files:
    - tweets-clean.txt
    - train_model.py

    python_archives:
    - mrcc.py.tar.gz

    setup:
    #- python2.7 train_model.py

    jobconf:
        mongo.job.input.format : com.mongodb.hadoop.MongoInputFormat
        mongo.input.uri : myserver:27017/twitter_db
        stream.io.identifier.resolver : com.mongodb.streaming.io.MongoIdentifierResolver


    bootstrap:
        - sudo yum --releasever=2014.09 install -y python27 python27-devel gcc-c++ numpy scipy
        - sudo python2.7 get-pip.py#
        - sudo pip2.7 install boto mrjob simplejson scikit-learn sklearn pymongo-hadoop
        - python2.7 train_model.py# tweets-clean.txt#
        - mongo-hadoop-bootstrap.sh#

当使用 mrjob Python 映射器/减速器时,我使用了这样的代码:

from mrjob.job import MRJob

class MRWordFrequencyCount(MRJob):

def mapper(self, _, line):
    words=line.split()
    for word in words:
        yield word, 1

def reducer(self, key, values):
        yield key, sum(values)

if __name__ == '__main__':
   MRWordFrequencyCount.run()

要修改它以使用 mongodb-hadoop 连接器,我正在尝试这样做:

from pymongo_hadoop import BSONMapper
from pymongo_hadoop import BSONReducer
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):

def mapper(self, _, documents):
    BSONMapper(self.bsonmapper)

def reducer(self, key, values):
    BSONReducer(self.bsonreducer)

def bsonmapper(documents):
    for doc in documents:
        yield {'_id' : doc['id']['user.id']}, {'count' : 1}

def bsonreducer(self, key, values):
    count = 0
    for v in values:
        count += v['count']
    return {'_id' : key, 'count' : count}       

if __name__ == '__main__':
    MRWordFrequencyCount.run()

问题是我没有正确地将方法传递给 BSONMapper 和 BSONReducer。BSONMapper 类在init () 中需要 1 个参数,但它得到了 2 个。

4

0 回答 0