3

我正在groupBy计算一个值,但似乎当我分组时,我丢失了所有不在聚合键中的字段:

filtered.filterNot('site) {s:String => ...}
        .filterNot('date) {s:String => ...}
aggr = filtered.groupBy('id, 'contentHost) { group =>
    group.min('timestamp -> 'min)
    //how do I keep original fields? (eg: site, date)
}

aggr.store(Tsv(...)) //eg: field "site" won't be here

在猪中,它会是这样的:

aggr = group filtered by concat('id, 'contentHost);

result = foreach aggr {
  generate flatten(filtered), //how to do this in scalding?
           min(filtered.timestamp) as min;
}
4

1 回答 1

4

我对元组 API 也有同样的问题,只能通过使用类型化 API 来解决。

您可以使用 Scala 元组或在工作之外定义自己的案例类。例如:

case class Data(id: String, site: String, date: String, contentHost: String)

然后你会像这样处理它:

val filtered: TypedPipe[Data] = TypedPipe.from(Seq(Data("...", "2014-04-14", "...", "...")))

filtered
  .filterNot ( data => data.site == "fr" )
  .filterNot ( data => data.date == "2014-02-01" )
  .groupBy (data => (data.id, data.contentHost)) // (String,String) -> Data
  .min // or .minBy { ... }
  .toTypedPipe
  .write(TypedTsv[((String, String), Data)]("/path/"))
于 2014-04-14T13:16:12.317 回答