好吧,我想最好的答案是 RTFC :P
如果您查看 /usr/lib/python2.6/site-packages/mrjob/job.py (假设您在 python2.6 上安装了带有 pip 的 mrjob),您会发现它是如何从输入中准确读取行并运行 mapper for每一行
def run_mapper(self, step_num=0):
...
# pick input and output protocol
read_lines, write_line = self._wrap_protocols(step_num, 'mapper')
if mapper_init:
for out_key, out_value in mapper_init() or ():
write_line(out_key, out_value)
# run the mapper on each line
for key, value in read_lines():
for out_key, out_value in mapper(key, value) or ():
write_line(out_key, out_value)
if mapper_final:
for out_key, out_value in mapper_final() or ():
write_line(out_key, out_value)
这里是 read_lines 的定义
def _wrap_protocols(self, step_num, step_type):
"""Pick the protocol classes to use for reading and writing
for the given step, and wrap them so that bad input and output
trigger a counter rather than an exception unless --strict-protocols
is set.
Returns a tuple of ``(read_lines, write_line)``
``read_lines()`` is a function that reads lines from input, decodes
them, and yields key, value pairs.
``write_line()`` is a function that takes key and value as args,
encodes them, and writes a line to output.
:param step_num: which step to run (e.g. 0)
:param step_type: ``'mapper'``, ``'reducer'``, or ``'combiner'`` from
:py:mod:`mrjob.step`
"""
read, write = self.pick_protocols(step_num, step_type)
def read_lines():
for line in self._read_input():
try:
key, value = read(line.rstrip('\r\n'))
yield key, value
except Exception, e:
if self.options.strict_protocols:
raise
else:
self.increment_counter(
'Undecodable input', e.__class__.__name__)
def write_line(key, value):
try:
print >> self.stdout, write(key, value)
except Exception, e:
if self.options.strict_protocols:
raise
else:
self.increment_counter(
'Unencodable output', e.__class__.__name__)
return read_lines, write_line
最终,您可以阅读 /usr/lib/python2.6/site-packages/mrjob/util.py 中的 read_input 和 read_file 方法。希望它有帮助