0

我正在尝试构建一个数据流模板。

目标是阅读 ValueProvider ,它会告诉我要阅读哪些文件。然后对于每个文件,我需要使用对象读取和丰富数据。我试过这个:

        p.apply(Create.of(options.getScheduleBatch()))
            .apply(ParDo.of(StringScheduleBatchToFileReceivedFn.of()))
            .apply(ParDo.of(new DoFn<FileReceived, PCollection<EventRow>>() {
                @ProcessElement
                public void process(ProcessContext c) {
                    FileReceived fileReceived = c.element();
                    Broker broker = configuration.getBroker(fileReceived.getBrokerId());
                    PCollection<EventRow> eventRows = p
                            .apply(TextIO.read().from(fileReceived.getUri()))
                            .apply(ParDo.of(StringToEventRowFn.of(broker, fileReceived, options.getJobName())));
                    c.output(eventRows);
                }
            }));

但我有以下错误:

从 CoderRegistry 推断编码器失败:无法为 org.apache.beam.sdk.values.PCollection 提供编码器。

我很想找到一种比自己使用 gcs 客户端读取文件更好的方法。

你有什么建议吗 ?

此致

4

1 回答 1

1

问题:

您正在尝试发出 aPCollection作为您的ParDo. 这行不通。

细节:

PCollection是一种抽象,表示可能无限的元素集合。将转换应用于 aPCollection会给您另一个PCollection. 您可以应用的转换之一是ParDo. ParDos进行元素转换。当应用ParDo你正在表达的 - “PCollection通过应用它来转换其中的所有元素来制作另一个ParDo”。

使处理有效的一件事是能够并行执行所有内容,例如,通过ParDo在每个执行节点上针对不同元素运行相同的元素,在多个执行节点(例如 VM/机器)上一次转换大量元素。而且您无法明确控制任何特定转换是否会发生在同一个执行节点或另一个执行节点上,如何优化它是底层系统设计的一部分。但是要实现这一点,您必须能够在执行节点之间传递元素并将它们持久化以进行聚合。Beam 通过要求您实施来支持这一点Coders对于元素。编码器是一种序列化机制,它允许 Beam 将元素(由 java 对象表示)转换为字节数组,然后可以将其传递给下一个转换(可能发生在另一台机器上)或存储。例如,Beam 需要能够对您从 a 输出的元素进行编码ParDo。Beam 知道如何序列化某些类型,但它不能自动推断所有内容,您必须为无法推断的内容提供编码器。

您的示例如下所示:获取 some ,并通过将 a应用于每个元素PCollection将其转换为另一个,然后将每个输入元素转换为. 这意味着一旦元素被 a 处理,您必须对其进行编码并将其传递给下一个转换。这里的问题是 - 您如何编码并将(可能无界的)传递给下一个转换或将其持久化以进行聚合?PCollectionParDoParDoPCollectionParDoPCollection

Beam 目前不支持此功能,因此您需要选择另一种方法。

在您的特定情况下,我不确定是否可以在 Beam 开箱即用中简单地使用文件名流并将它们转换为子管道来处理文件中的行。

解决方法:

我能想到的几种方法可以绕过这个限制:

  • 如果你的文件名有一个已知的模式,你可以指定模式,TextIO它可以在新文件到达时读取它们。

  • 如果它们没有已知的模式,您可能会编写另一个管道来重命名文件名,以便它们具有通用名称模式,然后TextIO在另一个管道中使用该模式。

  • 如果可行(例如文件适合内存),您可能可以使用纯 java FileAPI 读取文件内容,将它们拆分为行并在单个ParDo. 然后你可以StringToEventRowFn在下面应用相同的ParDo

希望这可以帮助

于 2018-11-08T19:33:13.653 回答