我需要处理一个(GCS)文件桶,其中每个文件都被压缩并包含一个多行 JSON 记录。此外,正在处理的文件的名称很重要,我需要在我的转换中知道它。
从文档中的示例开始,TextIO 看起来非常接近,但看起来它旨在逐行处理每个文件,并且不允许我一次读取整个文件。另外,我没有看到任何方法来获取正在处理的文件名?
PCollectionTuple results = p.apply(TextIO.Read
.from("gs://bucket/a/*.gz")
.withCompressionType(TextIO.CompressionType.GZIP)
.withCoder(MyJsonCoder.of()))
看起来我需要编写一个自定义的 IO 阅读器,或者类似的东西?关于最佳起点的任何提示?