开始试用 Apache Beam 并尝试使用它来读取和计算 HBase 表。当尝试在没有 Count.globally 的情况下读取表时,它可以读取行,但是当尝试计算行数时,进程挂起并且永远不会退出。
这是非常简单的代码:
Pipeline p = Pipeline.create(options);
p.apply("read", HBaseIO.read().withConfiguration(configuration).withTableId(HBASE_TABLE))
.apply(ParDo.of(new DoFn<Result, String>() {
@ProcessElement
public void processElement(ProcessContext c) {
Result result = c.element();
String rowkey = Bytes.toString(result.getRow());
System.out.println("row key: " + rowkey);
c.output(rowkey);
}
}))
.apply(Count.<String>globally())
.apply("FormatResults", MapElements.via(new SimpleFunction<Long, String>() {
public String apply(Long element) {
System.out.println("result: " + element.toString());
return element.toString();
}
}));
当使用 Count.globally 时,该过程永远不会完成。将其注释掉时,该过程会打印所有行。
有什么想法吗?