1

我正在尝试将包含多个值的 tablerow 转换为 KV。我可以在 DoFn 中实现这一点,但这会增加我想要进一步编写的代码的复杂性,并使我的工作更加困难。(基本上我需要对tablerow的两个pcollections执行CoGroupBy操作)

有什么方法可以将 PCollection 转换为 PCollection<KV<String, String>>,其中的键和值以与 tablerow 中相同的格式存储?

我写了一个看起来像这样的片段,但这并没有给我想要的结果,有什么方法可以加载 tablerow 中的所有条目并生成具有这些值的 KV?

ImmutableList<TableRow> input = ImmutableList.of(new TableRow().set("val1", "testVal1").set("val2", "testVal2").set("val3", "testVal3");
PCollection<TableRow> inputPC = p.apply(Create.of(input));

inputPC.apply(MapElements.into(TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
                .via(tableRow -> KV.of((String) tableRow.get("val1"), (String) tableRow.get("val2"))));
4

2 回答 2

1

看起来您想要的是一种对从 BigQuery 获得的数据执行联接的方法。无法TableRows直接执行联接,这是因为TableRows通常不打算将其作为管道中的元素进行操作,它们的目的是专门用于使用 BigQuery IO 进行读取和写入。

为了能够使用现有的 Beam 转换,您需要将它们TableRows转换为更有用的表示形式,例如您自己编写的 Java 对象或 Beam 模式Row类型。由于TableRow本质上是 JSON 字符串的字典,因此您需要做的就是编写一个 Map 函数,该函数读取适当的类型并在必要时对其进行解析。例如:

PCollection<TableRow> tableRows = ... // Reading from BigQuery IO.
PCollection<Foo> foos = tableRows.apply(MapElements.via(
    new SimpleFunction<TableRow, Foo>() {
        @Override
        public Foo apply(TableRow row) {
            String bar = (String) row.get("bar");
            Integer baz = (Integer.parseInt((String) row.get("baz")));
            return new Foo(bar, baz);
        }
    });

一旦您获得了您选择的类型的数据,您就可以使用 find 一种方法来使用内置的 Beam 转换来执行 Join。有很多潜在的方法可以做到这一点,所以我不会列出所有方法,但一个明确的首选是Join 类

于 2021-11-05T02:23:13.723 回答
0

要将 PCollection TableRow 转换为 PCollection 字符串,您可以使用以下代码:

static class StringConverter extends DoFn<String, TableRow> {
@Override
public void processElement(ProcessContext c) { 
c.output(new TableRow().set("string_field", c.element())); }
}

在这里,您可以阅读有关如何从 TableRow 转换为 String 的更多信息。

于 2021-11-01T19:21:15.097 回答