1

我正在尝试将管道输出到不同的目录中,以便每个目录的输出将根据某些 id 进行分桶。所以在一个普通的 map reduce 代码中,我会使用 MultipleOutputs 类,我会在 reducer 中做类似的事情。

protected void reduce(final SomeKey key,
      final Iterable<SomeValue> values,
      final Context context) {

   ...
   for (SomeValue value: values) {
     String bucketId = computeBucketIdFrom(...);
     multipleOutputs.write(key, value, folderName + "/" + bucketId);
   ...

所以我想在烫伤时可以这样做

...
  val somePipe = Csv(in, separator = "\t",
        fields = someSchema,
        skipHeader = true)
    .read

  for (i <- 1 until numberOfBuckets) {
    somePipe
    .filter('someId) {id: String => (id.hashCode % numberOfBuckets) == i}
    .write(Csv(out + "/bucket" + i ,
      writeHeader = true,
      separator = "\t"))
  }

但我觉得你最终会多次重复同一根管道,这会影响整体性能。

还有其他选择吗?

谢谢

4

1 回答 1

1

是的,当然有更好的方法使用TemplatedTsv

所以你上面的代码可以写成如下,

val somePipe = Tsv(in, fields = someSchema, skipHeader = true)
    .read
    .write(TemplatedTsv(out, "%s", 'some_id, writeHeader = true))

这会将来自 'some_id 的所有记录放入 out/some_ids 文件夹下的单独文件夹中。

但是,您也可以创建整数存储桶。把最后几行改一下

.map('some_id -> 'bucket) { id: String => id.hashCode % numberOfBuckets }    
.write(TemplatedTsv(out, "%02d", 'bucket, writeHeader = true, fields = ('all except 'bucket)))

这将创建两位数的文件夹作为 out/dd/。您还可以在此处查看 templatedTsv api。

使用 templatedTsv 可能会有一个小问题,即 reducer 会生成大量小文件,这可能对使用您的结果的下一项工作不利。因此,最好在写入磁盘之前对模板字段进行排序。我在这里写了一篇关于它的博客

于 2015-02-13T09:08:21.350 回答