0

我有一个 Java Iterable 对象,Iterable 记录。我想将它传递给 Beam 管道。我试过了

PCollection csvRecordPC = p.apply("创建集合", Create.of(records));

它导致了一个错误

执行 Java 类时发生异常。无法为没有元素的“创建”PTransform 确定默认编码器。添加元素,调用 Create.empty(Coder)、Create.empty(TypeDescriptor),或者在 PTransform 上调用 'withCoder(Coder)' 或 'withType(TypeDescriptor)'。

我应该使用哪个编码器?或者我该如何编写我的自定义编码器?

4

1 回答 1

2

我找到了使用 FileIO 的解决方案。

p.apply(FileIO.match().filepattern(options.getInputFile()))
 .apply(FileIO.readMatches())
 .apply(ParDo.of(new CsvParser())) 

CsvPaser() 是

public class CsvParser extends DoFn<ReadableFile, CSVRecord> {
    @DoFn.ProcessElement
    public void processElement(@Element ReadableFile element, DoFn.OutputReceiver<CSVRecord> receiver) throws IOException {
        InputStream is = Channels.newInputStream(element.open());

        Reader reader = new InputStreamReader(is);

        Iterable<CSVRecord> records = CSVFormat.EXCEL.withFirstRecordAsHeader().parse(reader);

        for (CSVRecord record : records) {
            receiver.output(record);
        }
    }
}
于 2019-03-01T04:46:01.910 回答