我有一个 CSV 文件,但我事先不知道列名。在 Google Dataflow 中进行一些转换后,我需要以 JSON 格式输出数据。
获取标题行并将标签渗透到所有行的最佳方法是什么?
例如:
a,b,c
1,2,3
4,5,6
...变成(大约):
{a:1, b:2, c:3}
{a:4, b:5, c:6}
我有一个 CSV 文件,但我事先不知道列名。在 Google Dataflow 中进行一些转换后,我需要以 JSON 格式输出数据。
获取标题行并将标签渗透到所有行的最佳方法是什么?
例如:
a,b,c
1,2,3
4,5,6
...变成(大约):
{a:1, b:2, c:3}
{a:4, b:5, c:6}
您应该实现自定义FileBasedSource(类似于TextIO.TextSource),它将读取第一行并存储标题数据
@Override
protected void startReading(final ReadableByteChannel channel)
throws IOException {
lineReader = new LineReader(channel);
if (lineReader.readNextLine()) {
final String headerLine = lineReader.getCurrent().trim();
header = headerLine.split(",");
readingStarted = true;
}
}
后者,在读取其他行时将其添加到当前行数据:
@Override
protected boolean readNextRecord() throws IOException {
if (!lineReader.readNextLine()) {
return false;
}
final String line = lineReader.getCurrent();
final String[] data = line.split(",");
// assumes all lines are valid
final StringBuilder record = new StringBuilder();
for (int i = 0; i < header.length; i++) {
record.append(header[i]).append(":").append(data[i]).append(", ");
}
currentRecord = record.toString();
return true;
}
我已经实现了一个快速(完整)的解决方案,可以在github上找到。我还添加了一个数据流单元测试来演示阅读:
@Test
public void test_reading() throws Exception {
final File file =
new File(getClass().getResource("/sample.csv").toURI());
assertThat(file.exists()).isTrue();
final Pipeline pipeline = TestPipeline.create();
final PCollection<String> output =
pipeline.apply(Read.from(CsvWithHeaderFileSource.from(file.getAbsolutePath())));
DataflowAssert
.that(output)
.containsInAnyOrder("a:1, b:2, c:3, ", "a:4, b:5, c:6, ");
pipeline.run();
}
其中sample.csv
有以下内容:
a,b,c
1,2,3
4,5,6
我已经根据 Luka 的源代码创建了一个解决方案(请参阅上一个答案)。Luka 在 github 中的代码用于 dataflow-1.x,并实现了 FileBasedSource,它提取第一行并将其缓存,然后将其添加到后面的每一行。这需要在单个节点上处理整个文件(不可拆分)。
我的 FileBasedSource 变体只是返回文件的第一行;如 javadoc 类中所述,该行可以被拆分(根据需要)并用作处理完整文件的逻辑的侧输入(然后可以并行完成)。该代码与 Beam 2.x 兼容(在 Beam 2.4.0 上测试)。
我正在使用 Luka 的阅读器,它在启动其他链式管道之前读取整个 csv 文件。是否可以定义块大小,例如读取 10 行处理写入,然后读取接下来的 10 行
PCollection<String> input = pipeline.apply(Read.from(CustomCsvReader.from(options.getInput())));
PCollection<Map<String,String>> mapOutput = input.apply(MapElements.via(new SimpleFunction<String, Map<String,String>>() {
@Override
public Map<String,String> apply(String input) {
String[] entrys = input.split(",");
return Stream.of(entrys).map(t -> t.split(":",2)).collect(Collectors.toMap(a -> a[0], a -> a.length > 1 ? a[1]: ""));
}
}));
PCollection<String> output = mapOutput.apply(ParDo.of(new CSVToXMLConverter()));
output.apply(TextIO.write().to(options.getOutput()).withFooter(Constants.CCR_FOOTER));
pipeline.run().waitUntilFinish();