我看到了: Scalding:如何在 groupBy('field){.size} 之后保留另一个字段?
与 Apache Pig 相比,这是一个真正的痛苦和混乱......我做错了什么?我可以像GENERATE(FLATTEN())猪一样做吗?
我很困惑。这是我的烫伤代码:
def takeTop(topAmount: Int) :Pipe = self
.groupBy(person1){ _.sortedReverseTake[Long](activityCount -> top, topAmount)}
.flattenTo[(Long, Long, Long)](top -> (person1, person2, activityCount))
我的测试:
"Take top 3" should "return most active pairs" in {
Given{
List( (1, 13, 7),
(1, 13, 8),
(1, 12, 9),
(1, 11, 10),
(2, 20, 21),
(2, 20, 22)) withSchema (person1, person2, activityCount)
} When {
pipe:RichPipe => pipe.takeTop(3)
} Then {
buffer: mutable.Buffer[(Long, Long, Long)] =>
println(buffer.toList)
buffer.toList.size should equal(5)
println (buffer.toList)
buffer.toList should contain (1, 11, 10)
buffer.toList should contain (1, 12, 9)
buffer.toList should contain (1, 13, 8)
buffer.toList should not contain (1, 13, 7)
buffer.toList should contain (2, 20, 21)
buffer.toList should contain (2, 20, 22)
}
}
我确实在运行时遇到了异常:
14/09/23 15:25:57 ERROR stream.TrapHandler: caught Throwable, no trap available, rethrowing
cascading.pipe.OperatorException: [com.twitter.scalding.T...][com.twitter.scalding.RichPipe.eachTo(RichPipe.scala:478)] operator Each failed executing operation
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:107)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:39)
at cascading.flow.stream.CloseReducingDuct.completeGroup(CloseReducingDuct.java:47)
at cascading.flow.stream.AggregatorEveryStage$1.collect(AggregatorEveryStage.java:67)
at cascading.tuple.TupleEntryCollector.safeCollect(TupleEntryCollector.java:145)
at cascading.tuple.TupleEntryCollector.add(TupleEntryCollector.java:133)
at com.twitter.scalding.MRMAggregator.complete(Operations.scala:321)
at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:151)
at cascading.flow.stream.AggregatorEveryStage.completeGroup(AggregatorEveryStage.java:39)
at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:51)
at cascading.flow.stream.OpenReducingDuct.receive(OpenReducingDuct.java:28)
at cascading.flow.local.stream.LocalGroupByGate.complete(LocalGroupByGate.java:113)
at cascading.flow.stream.Duct.complete(Duct.java:81)
at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
at cascading.flow.stream.Duct.complete(Duct.java:81)
at cascading.flow.stream.OperatorStage.complete(OperatorStage.java:296)
at cascading.flow.stream.SourceStage.map(SourceStage.java:105)
at cascading.flow.stream.SourceStage.call(SourceStage.java:53)
at cascading.flow.stream.SourceStage.call(SourceStage.java:38)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303)
at java.util.concurrent.FutureTask.run(FutureTask.java:138)
at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)
at java.lang.Thread.run(Thread.java:662)
Caused by: java.lang.ClassCastException: java.lang.Long cannot be cast to scala.Tuple3
at com.twitter.scalding.GeneratedTupleSetters$$anon$25.apply(GeneratedConversions.scala:669)
at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:47)
at com.twitter.scalding.FlatMapFunction$$anonfun$operate$2.apply(Operations.scala:46)
at scala.collection.immutable.List.foreach(List.scala:318)
at com.twitter.scalding.FlatMapFunction.operate(Operations.scala:46)
at cascading.flow.stream.FunctionEachStage.receive(FunctionEachStage.java:99)
... 23 more
我做错了什么?
升级版:
我是这样做的:
def takeTop(topAmount: Int) :Pipe = self
.groupBy(person1){ _.sortedReverseTake[(Long,Long, Long)]((activityCount, person1, person2) -> top, topAmount)}
.flattenTo[(Long, Long, Long)](top -> (activityCount, person1, person2))
.project(person1, person2, activityCount)
测试通过,但我不确定这是一个好方法......