0

我最近发现了 Disco Project,与 Hadoop 相比,我非常喜欢它,但我有一个问题。我的项目是这样设置的(如果有帮助,我很乐意剪切/粘贴真实代码):

我的文件.py

from disco.core import Job, result_iterator
import collections, sys
from disco.worker.classic.func import chain_reader
from disco.worker.classic.worker import Params

def helper1():
   #do stuff

def helper2():
   #do stuff
.
.
.
def helperN():
   #do stuff

class A(Job):
   @staticmethod
   def map_reader(fd, params):
      #Read input file
      yield line

   def map(self, line, params):
      #Process lines into dictionary
      #Iterate dictionary
          yield k, v

   def reduce(self, iter, out, params):
      #iterate iter
      #Process k, v into dictionary, aggregating values
      #Process dictionry
      #Iterate dictionary
         out.add(k,v)

Class B(Job):

   map_reader = staticmethod(chain_reader)
   map = staticmethod(nop_map)

   reduce(self, iter, out, params):
      #Process iter
      #iterate results
         out.add(k,v)


if __name__ == '__main__':
   from myfile import A, B
   job1 = A().run(input=[input_filename], params=Params(k=k))
   job2 = B().run(input=[job1.wait()], params=Params(k=k))
   with open(output_filename, 'w') as fp:
        for count, line in result_iterator(job2.wait(show=True)):
            fp.write(str(count) + ',' + line + '\n')

我的问题是工作流程完全跳过了 A 的 reduce 并下降到 B 的 reduce。

有什么想法吗?

4

1 回答 1

0

这是一个简单但微妙的问题:我没有

show = True

对于工作1。出于某种原因,在为 job2 设置了 show 后,它向我展示了 job1 的 map() 和 map-shuffle() 步骤,所以由于我没有得到我期望的最终结果,并且输入到 job2 函数之一看起来是错误的,我立即得出结论,job1 步骤未正确运行(这得到了进一步的支持,即在我添加 job2 之前,我验证了 job1 输出的准确性)。

于 2016-01-02T20:26:25.160 回答