我正在尝试使用 mrjob 和 Python 对 MongoDB 数据库进行地图缩减。mongodb-hadoop 连接器有关于如何使用 AWS EMR 但不与 mrjob 一起使用的示例,我并没有把所有的东西都放在一起。就 mrjob.conf 而言,这是我已经拥有的:
enable_emr_debugging: true
ami_version: 3.0.4
interpreter: python2.7
upload_files:
- tweets-clean.txt
- train_model.py
python_archives:
- mrcc.py.tar.gz
setup:
#- python2.7 train_model.py
jobconf:
mongo.job.input.format : com.mongodb.hadoop.MongoInputFormat
mongo.input.uri : myserver:27017/twitter_db
stream.io.identifier.resolver : com.mongodb.streaming.io.MongoIdentifierResolver
bootstrap:
- sudo yum --releasever=2014.09 install -y python27 python27-devel gcc-c++ numpy scipy
- sudo python2.7 get-pip.py#
- sudo pip2.7 install boto mrjob simplejson scikit-learn sklearn pymongo-hadoop
- python2.7 train_model.py# tweets-clean.txt#
- mongo-hadoop-bootstrap.sh#
当使用 mrjob Python 映射器/减速器时,我使用了这样的代码:
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):
def mapper(self, _, line):
words=line.split()
for word in words:
yield word, 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
MRWordFrequencyCount.run()
要修改它以使用 mongodb-hadoop 连接器,我正在尝试这样做:
from pymongo_hadoop import BSONMapper
from pymongo_hadoop import BSONReducer
from mrjob.job import MRJob
class MRWordFrequencyCount(MRJob):
def mapper(self, _, documents):
BSONMapper(self.bsonmapper)
def reducer(self, key, values):
BSONReducer(self.bsonreducer)
def bsonmapper(documents):
for doc in documents:
yield {'_id' : doc['id']['user.id']}, {'count' : 1}
def bsonreducer(self, key, values):
count = 0
for v in values:
count += v['count']
return {'_id' : key, 'count' : count}
if __name__ == '__main__':
MRWordFrequencyCount.run()
问题是我没有正确地将方法传递给 BSONMapper 和 BSONReducer。BSONMapper 类在init () 中需要 1 个参数,但它得到了 2 个。