我正在尝试构建一个数据流模板。
目标是阅读 ValueProvider ,它会告诉我要阅读哪些文件。然后对于每个文件,我需要使用对象读取和丰富数据。我试过这个:
p.apply(Create.of(options.getScheduleBatch()))
.apply(ParDo.of(StringScheduleBatchToFileReceivedFn.of()))
.apply(ParDo.of(new DoFn<FileReceived, PCollection<EventRow>>() {
@ProcessElement
public void process(ProcessContext c) {
FileReceived fileReceived = c.element();
Broker broker = configuration.getBroker(fileReceived.getBrokerId());
PCollection<EventRow> eventRows = p
.apply(TextIO.read().from(fileReceived.getUri()))
.apply(ParDo.of(StringToEventRowFn.of(broker, fileReceived, options.getJobName())));
c.output(eventRows);
}
}));
但我有以下错误:
从 CoderRegistry 推断编码器失败:无法为 org.apache.beam.sdk.values.PCollection 提供编码器。
我很想找到一种比自己使用 gcs 客户端读取文件更好的方法。
你有什么建议吗 ?
此致