2

我想要一个 Dataflow 模板,其中一个 PipelineOptions 参数具有默认值。

受在线示例的启发,我在我的 PipelineOptions“子”界面中使用 ValueProvider 占位符进行延迟参数设置:

  @Default.String("MyDefaultValue")
  ValueProvider<String> getMyValue();
  void setMyValue(ValueProvider<String> value);

如果我在运行时指定参数,则模板可用于启动真正的 GCP Dataflow 作业。但是,如果我在实际执行此操作之前尝试测试不包括参数:

@Rule
public TestPipeline pipeline = TestPipeline.create();
...
  
@Test
public void test() {
  PipelineOptions options = PipelineOptionsFactory.fromArgs(new String[] {...}).withValidation();
  ...
  pipeline.run(options);
}

然后,当我的 TestPipeline 执行需要参数的 DoFn processElement 方法时,我得到

IllegalStateException: Value only available at runtime, but accessed from a non-runtime context: 
RuntimeValueProvider{propertyName=myValue, default=MyDefaultValue}
...

更具体地说,它在 org.apache.beam.sdk.options.ValueProvider 中失败:

@Override
public T get() {
  PipelineOptions options = optionsMap.get(optionsId);
  if (options == null) {
    throw new IllegalStateException(...

认为运行时是管道运行时可能是可以原谅的。

无论如何,有人知道我将如何对默认参数进行单元测试,假设顶部代码片段应该如何设置并且受支持?谢谢你。

4

1 回答 1

0

当我从 Eclipse 生成数据流模板时,我遇到了同样的问题,我的数据流模板从 Cloud Composer DAG 接收参数。

我从 Google Cloud 文档中获得了解决方案: https ://cloud.google.com/dataflow/docs/guides/templates/creating-templates#using-valueprovider-in-your-functions

于 2020-08-05T21:26:13.553 回答