我正在尝试将管道输出到不同的目录中,以便每个目录的输出将根据某些 id 进行分桶。所以在一个普通的 map reduce 代码中,我会使用 MultipleOutputs 类,我会在 reducer 中做类似的事情。
protected void reduce(final SomeKey key,
final Iterable<SomeValue> values,
final Context context) {
...
for (SomeValue value: values) {
String bucketId = computeBucketIdFrom(...);
multipleOutputs.write(key, value, folderName + "/" + bucketId);
...
所以我想在烫伤时可以这样做
...
val somePipe = Csv(in, separator = "\t",
fields = someSchema,
skipHeader = true)
.read
for (i <- 1 until numberOfBuckets) {
somePipe
.filter('someId) {id: String => (id.hashCode % numberOfBuckets) == i}
.write(Csv(out + "/bucket" + i ,
writeHeader = true,
separator = "\t"))
}
但我觉得你最终会多次重复同一根管道,这会影响整体性能。
还有其他选择吗?
谢谢