0

在旧版本scalding中仍然没有counters在其 API 中引入。 Hadoop Counters In Scalding建议如何回退到 scalding 中的级联计数器

def addCounter(pipe : Pipe, group : String, counter : String) = {

  pipe.each(() -> ('addCounter)) ( fields =>
    new BaseOperation[Any](fields) with Function[Any] {

      def operate(flowProcess : FlowProcess[_], 
        functionCall : FunctionCall[Any]) {

          try {
            flowProcess.asInstanceOf[HadoopFlowProcess]
              .increment(group, counter, 1L)
            functionCall.getOutputCollector.add(new Tuple(new Array[Object](1) : _*))
          } catch {
            case cce: ClassCastException =>
            // HadoopFlowProcess is not available in local mode
          }
      }.discard('addCounter)
    }
  )
}

但是,当我尝试得到:

Error:(74, 14) ';' expected but '.' found.
}.discard('addCounter)
^

我错过了什么吗?我使用的烫伤版本:0.8.7

4

1 回答 1

2

.discard是一个烫伤命令,因此应该.each与代码块中的另一个烫伤命令处于同一级别。试着把它放在最后一个右括号“)”之后。(您发布的代码中的倒数第二行。)

在这里,操作被链接到 RichPipe 管道,首先是each,然后是discard

pipe.each(...){predicate}.discard(...)
于 2015-01-24T08:15:21.263 回答