我从文档和这个答案中都读到了可以动态确定表目标的信息。我使用了完全相同的方法,如下所示:
PCollection<Foo> foos = ...;
foos.apply(BigQueryIO.write().to(new SerializableFunction<ValueInSingleWindow<Foo>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<Foo> value) {
Foo foo = value.getValue();
// Also available: value.getWindow(), getTimestamp(), getPane()
String tableSpec = ...;
String tableDescription = ...;
return new TableDestination(tableSpec, tableDescription);
}
}).withFormatFunction(new SerializableFunction<Foo, TableRow>() {
@Override
public TableRow apply(Foo foo) {
return ...;
}
}).withSchema(...));
但是,我收到以下编译错误:
The method to(String) in the type BigQueryIO.Write<Object> is not applicable for the arguments (new SerializableFunction<ValueInSingleWindow<Foo>,TableDestination>(){})
任何帮助,将不胜感激。
编辑以澄清我如何在我的案例中使用窗口:
PCollection<Foo> validFoos = ...;
PCollection<TableRow> validRows = validFoos.apply(ParDo.named("Convert Foo to table row")
.of(new ConvertToValidTableRowFn()))
.setCoder(TableRowJsonCoder.of());
TableSchema validSchema = ConvertToValidTableRowFn.getSchema();
validRows.apply(Window.<TableRow>into(CalendarWindows.days(1))).apply(BigQueryIO.writeTableRows()
.to(new SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination>() {
@Override
public TableDestination apply(ValueInSingleWindow<TableRow> value) {
TableRow t = value.getValue();
String fooName = ""; // get name from table
TableDestination td = new TableDestination(
"my-project:dataset.table$" + fooName, "");
return td;
}
}));
在这种情况下,我收到以下错误The method apply(PTransform<? super PCollection<TableRow>,OutputT>) in the type PCollection<TableRow> is not applicable for the arguments (Window<TableRow>)
。