在旧版本scalding
中仍然没有counters
在其 API 中引入。 Hadoop Counters In Scalding建议如何回退到 scalding 中的级联计数器
def addCounter(pipe : Pipe, group : String, counter : String) = {
pipe.each(() -> ('addCounter)) ( fields =>
new BaseOperation[Any](fields) with Function[Any] {
def operate(flowProcess : FlowProcess[_],
functionCall : FunctionCall[Any]) {
try {
flowProcess.asInstanceOf[HadoopFlowProcess]
.increment(group, counter, 1L)
functionCall.getOutputCollector.add(new Tuple(new Array[Object](1) : _*))
} catch {
case cce: ClassCastException =>
// HadoopFlowProcess is not available in local mode
}
}.discard('addCounter)
}
)
}
但是,当我尝试得到:
Error:(74, 14) ';' expected but '.' found.
}.discard('addCounter)
^
我错过了什么吗?我使用的烫伤版本:0.8.7