0

我正在使用Apache's Beamsdk 版本并尝试使用runner0.2.0-incubating-SNAPSHOT 将数据拉到一个 bigtable 中。Dataflow不幸的是,我NullPointerException在执行用作接收器的数据流管道时遇到了问题BigTableIO.WriteBigtableOptions根据我的需要,已经检查了我的参数并没有问题。

基本上,我创建并在我的管道的某个点上,我有步骤将其写入PCollection<KV<ByteString, Iterable<Mutation>>>我想要的大表:

final BigtableOptions.Builder optionsBuilder =
    new BigtableOptions.Builder().setProjectId(System.getProperty("PROJECT_ID"))
        .setInstanceId(System.getProperty("BT_INSTANCE_ID"));

// do intermediary steps and create PCollection<KV<ByteString, Iterable<Mutation>>> 
// to write to bigtable

// modifiedHits is a PCollection<KV<ByteString, Iterable<Mutation>>>
modifiedHits.apply("writting to big table", BigtableIO.write()
    .withBigtableOptions(optionsBuilder).withTableId(System.getProperty("BT_TABLENAME")));

p.run();

在执行管道时,我得到了NullPointerException,在方法中准确地指出了 BigtableIO 类public void processElement(ProcessContext c)

(6e0ccd8407eed08b): java.lang.NullPointerException at org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.processElement(BigtableIO.java:532)

我检查了此方法是否在处理所有元素之前在 bigtable 上写入,但不确定为什么我会超时执行此管道。根据下面的代码,此方法使用bigtableWriter属性来处理每个c.element(),但我什至无法设置断点来调试null. 有关如何解决此问题的任何建议或建议?

@ProcessElement
  public void processElement(ProcessContext c) throws Exception {
    checkForFailures();
    Futures.addCallback(
        bigtableWriter.writeRecord(c.element()), new WriteExceptionCallback(c.element()));
    ++recordsWritten;
  }

谢谢。

4

1 回答 1

2

0.3.0-incubating-SNAPSHOT我查找了作业及其类路径,如果我没记错的话beam-sdks-java-{core,io},看起来您使用的version 0.2.0-incubating-SNAPSHOTgoogle-cloud-dataflow-java.

我相信问题是因为这个 - 你必须使用相同的版本(更多细节:版本 0.3.0 中的 BigtableIO 使用@Setup@Teardown方法,但 runner 0.2.0 还不支持它们)。

于 2016-09-15T04:23:56.547 回答