2

我正在关注这篇文章文档的答案,以便在管道末端对我的数据执行动态窗口写入。这是我到目前为止所拥有的:

static void applyWindowedWrite(PCollection<String> stream) {
    stream.apply(
        FileIO.<String, String>writeDynamic()
            .by(Event::getKey)
            .via(TextIO.sink())
            .to("gs://some_bucket/events/")
            .withNaming(key -> defaultNaming(key, ".json")));
}

但是 NetBeans 在最后一行警告我一个语法错误:

FileNaming is not public in Write; cannot be accessed outside package

如何使defaultNaming我的管道可用,以便我可以将其用于动态写入。或者,如果那不可能,我应该怎么做?

4

1 回答 1

5

发布我发现的内容,以防其他人遇到此问题。

我之前尝试使用的方式存在三个问题writeDynamic()

  1. 以前我一直在使用 Beam 版本 2.3.0,它确实描述FileNamingFileIO.Write. Beam 2.4.0 定义FileNamingpublic static interface使其在外部可用。
  2. 完全解析/导入defaultNaming. 而不是defaultNaming直接调用——就像在示例文档中调用的那样——它必须像我实际导入的包FileIO.Write.defaultNaming一样被调用。FileIO
  3. 添加withDestinationCoder也需要执行动态写入。

最终的解决方案最终看起来像这样。

static void applyWindowedWrite(PCollection<String> stream) {
    stream.apply(FileIO.<String, String>writeDynamic()
                .by(Event::getKey)
                .via(TextIO.sink())
                .to("gs://some_bucket/events/")
                .withDestinationCoder(StringUtf8Coder.of())
                .withNumShards(1)
                .withNaming(key -> FileIO.Write.defaultNaming(key, ".json")));
}

whereEvent::getKey是与签名在同一包中定义的静态函数public static String getKey(String event)

这将执行一个窗口写入,每个窗口写入一个文件(由.withNumShards(1)方法定义)。这假定窗口已在上一步中定义。AGroupByKey在写入之前不是必需的,因为只要明确定义分片数量,它就会在写入过程中完成。有关“Writing files -> How many shards are generated per pane”下的更多详细信息,请参阅FileIO 文档

于 2018-05-08T19:58:47.403 回答