2

我需要处理一个(GCS)文件桶,其中每个文件都被压缩并包含一个多行 JSON 记录。此外,正在处理的文件的名称很重要,我需要在我的转换中知道它。

从文档中的示例开始,TextIO 看起来非常接近,但看起来它旨在逐行处理每个文件,并且不允许我一次读取整个文件。另外,我没有看到任何方法来获取正在处理的文件名?

PCollectionTuple results = p.apply(TextIO.Read
    .from("gs://bucket/a/*.gz")
    .withCompressionType(TextIO.CompressionType.GZIP)
    .withCoder(MyJsonCoder.of()))

看起来我需要编写一个自定义的 IO 阅读器,或者类似的东西?关于最佳起点的任何提示?

4

1 回答 1

4

你是对的,现在没有一个现有的类完全符合你的要求。有2种合理的方法:

  • 自己匹配文件模式(使用IOChannelUtilsIOChannelFactory)并将生成的文件包装到PCollection<String>字符串将是文件名的地方,使用Create.of(filenames). 然后应用一个ParDo读取给定文件名的函数。
  • 编写您自己的Source子类(还有 FileBasedSource,但它不太适合您的用例)。它将由文件模式配置,并splitIntoBundles匹配文件模式并扩展为单独的源,每个源对应于一个文件。

我会推荐第一种方法,因为它看起来代码更少,而且您的用例不需要Source.

于 2015-03-31T16:14:17.910 回答