0

我从文档和这个答案中都读到了可以动态确定表目标的信息。我使用了完全相同的方法,如下所示:

PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
  @Override
  public TableDestination apply(ValueInSingleWindow<Foo> value) {  
    Foo foo = value.getValue();
    // Also available: value.getWindow(), getTimestamp(), getPane()
    String tableSpec = ...;
    String tableDescription = ...;
    return new TableDestination(tableSpec, tableDescription);
  }
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
  @Override
  public TableRow apply(Foo foo) {
    return ...;
  }
}).withSchema(...));

但是,我收到以下编译错误:

The method to(String) in the type BigQueryIO.Write<Object> is not applicable for the arguments (new SerializableFunction<ValueInSingleWindow<Foo>,TableDestination>(){})

任何帮助,将不胜感激。

编辑以澄清我如何在我的案例中使用窗口:

PCollection<Foo> validFoos = ...;           
PCollection<TableRow> validRows = validFoos.apply(ParDo.named("Convert Foo to table row")
        .of(new ConvertToValidTableRowFn()))
        .setCoder(TableRowJsonCoder.of());
TableSchema validSchema = ConvertToValidTableRowFn.getSchema();    

validRows.apply(Window.<TableRow>into(CalendarWindows.days(1))).apply(BigQueryIO.writeTableRows()
        .to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
            @Override
            public TableDestination apply(ValueInSingleWindow<TableRow> value) {
                TableRow t = value.getValue();
                String fooName = ""; // get name from table
                TableDestination td = new TableDestination(
                        "my-project:dataset.table$" + fooName, "");
                return td;
            }
        }));

在这种情况下,我收到以下错误The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (Window<TableRow>)

4

1 回答 1

0

我相信编译错误来自您在 上执行此操作的事实PCollection<Foo>,而实际上它需要窗口值。所以你应该首先使用.apply(Window.<Foo>into(...)),然后根据你的窗口确定表格目的地。

您可以在this answerthis answer以及您提到的文档中查看示例。

于 2017-06-01T09:08:50.037 回答