6

所以我的输入数据有两个字段/列:id1 & id2,我的代码如下:

TextLine(args("input"))
.read
.mapTo('line->('id1,'id2)) {line: String =>
    val fields = line.split("\t")
        (fields(0),fields(1))
}
.groupBy('id2){.size}
.write(Tsv(args("output")))

输出结果(我假设)两个字段:id2 * size。我有点想知道是否可以保留也与 id2 分组的 id1 值并将其添加为另一个字段?

4

1 回答 1

8

恐怕你不能以一种好的方式做到这一点。想想它在后台是如何工作的——它将要计数的数据分成块并将其发送到不同的进程,每个进程都计算它的块,然后一个减速器在最后将它们全部加起来。虽然每个进程都在计数,但它不知道整个大小,因此无法添加该字段。唯一的方法是在整个大小已知后返回并将其添加到数据中(即连接)。

如果每个组都适合内存(并且您可以配置内存),您可以:

Tsv(args("input"), ('id1, 'id2))
.groupBy('id2)(_.size.toList[(String, String)](('id1, 'id2) -> 'list))
.flatMapTo[(Iterable[(String, String)], Int), (String, String, Int)](('list, 'size) -> ('id1, 'id2, 'size)) {
  case (list, size) => list.map(record => (record._1, record._2, size))
}
.write(Tsv(args("output")))

但是如果您的系统没有足够的内存,您将不得不使用昂贵的连接。

备注:您可以使用 Tsv 代替 TextLine 后跟 mapTo 和拆分。

于 2013-09-09T14:06:36.830 回答