0

如果我想将 TableRow 或 String 的 SCollection 输出到谷歌云存储 (GCS),我分别使用 saveAsTableRowJsonFile 或 saveAsTextFile。这两种方法最终都使用

private[scio] def pathWithShards(path: String) = path.replaceAll("\\/+$", "") + "/part" 

它强制文件名以“part”开头。是通过使用 saveAsCustomOutput 输出自定义分片文件的唯一方法吗?

4

2 回答 2

3

我必须通过 saveAsCustomOutput 在梁代码中做到这一点

import org.apache.beam.sdk.util.Transport
val jsonFactory: JsonFactory = Transport.getJsonFactory
val outputPath = "gs://foo/bar_" // file prefix will be bar_
@BigQueryType.toTable()
case class Clazz(foo: String, bar: String)
val collection: SCollection[Clazz] = ....
collection.map(Clazz.toTableRow).
          map(jsonFactory.toString).
          saveAsCustomOutput(name = "CustomWrite", io.TextIO.write()
            .to(outputPath)
            .withSuffix("")
            .withWritableByteChannelFactory(FileBasedSink.CompressionType.GZIP))
于 2018-06-21T16:58:18.740 回答
0

Scio 的SCollection#saveAs*API 是对模仿其他流行系统行为的通用接收器的自以为是的包装器,在这种情况下,是 Hadoop Map/Reduce 前缀输出文件的方式。SCollection#saveAsCustomOutput如果您想直接访问较低级别的 Beam API,那么这是正确的方法。

于 2018-09-21T07:23:00.923 回答