我正在使用Apache's Beam
sdk 版本并尝试使用runner0.2.0-incubating-SNAPSHOT
将数据拉到一个 bigtable 中。Dataflow
不幸的是,我NullPointerException
在执行用作接收器的数据流管道时遇到了问题BigTableIO.Write
。BigtableOptions
根据我的需要,已经检查了我的参数并没有问题。
基本上,我创建并在我的管道的某个点上,我有步骤将其写入PCollection<KV<ByteString, Iterable<Mutation>>>
我想要的大表:
final BigtableOptions.Builder optionsBuilder =
new BigtableOptions.Builder().setProjectId(System.getProperty("PROJECT_ID"))
.setInstanceId(System.getProperty("BT_INSTANCE_ID"));
// do intermediary steps and create PCollection<KV<ByteString, Iterable<Mutation>>>
// to write to bigtable
// modifiedHits is a PCollection<KV<ByteString, Iterable<Mutation>>>
modifiedHits.apply("writting to big table", BigtableIO.write()
.withBigtableOptions(optionsBuilder).withTableId(System.getProperty("BT_TABLENAME")));
p.run();
在执行管道时,我得到了NullPointerException
,在方法中准确地指出了 BigtableIO 类public void processElement(ProcessContext c)
:
(6e0ccd8407eed08b): java.lang.NullPointerException at org.apache.beam.sdk.io.gcp.bigtable.BigtableIO$Write$BigtableWriterFn.processElement(BigtableIO.java:532)
我检查了此方法是否在处理所有元素之前在 bigtable 上写入,但不确定为什么我会超时执行此管道。根据下面的代码,此方法使用bigtableWriter
属性来处理每个c.element()
,但我什至无法设置断点来调试null
. 有关如何解决此问题的任何建议或建议?
@ProcessElement
public void processElement(ProcessContext c) throws Exception {
checkForFailures();
Futures.addCallback(
bigtableWriter.writeRecord(c.element()), new WriteExceptionCallback(c.element()));
++recordsWritten;
}
谢谢。