0

我正在使用来自 spotify 的scio来完成我的 Dataflow 作业。在上一个 scio 版本中,使用了新的 bigtable java api (com.google.bigtable.v2)

现在scio bigtable 入口点需要“RowFilter”来过滤而不是 Hbase“Scan”。有没有一种简单的方法可以将“Scan”转换为“RowFilter”?我在源代码中寻找适配器,但我不确定如何使用它。我没有找到可以轻松从 hbase api 迁移到“新”api 的文档。

我在需要转换的代码中使用的简单扫描:

val scan = new Scan()
scan.setRowPrefixFilter("helloworld".getBytes)
scan.addColumn("family".getBytes, "qualifier".getBytes)
scan.setMaxVersions()
4

1 回答 1

1

理论上,您可以将bigtable-hbase 依赖项添加到项目中并调用com.google.cloud.bigtable.hbase.adapters.Adapters.SCAN_ADAPTER.adapt(scan)将 转换Scan为 a RowFilter,或者更具体地说是 a[ReadRowsRequest][3]其中包含 a [RowFilter][4]。(这些链接指向那些包含变量和大量注释的对象的 protobuf 定义)。

也就是说,bigtable-hbase 依赖项添加了很多传递依赖项。我会在独立项目中使用 bigtable-hbase SCAN_ADAPTER,然后打印 RowFilter 以查看它是如何构造的。

在您提到的特定情况下,这RowFilter很简单,但可能会有其他复杂情况。您的扫描分为三个部分,因此我将详细说明如何实现它们:

  1. scan.setRowPrefixFilter("helloworld".getBytes). 这转换为 on 的开始键和结束键BigtableIO。“helloworld”是开始键,你可以用 计算结束键RowKeyUtil. calculateTheClosestNextRowKeyForPrefix。默认BigtableIO不公开设置开始键和设置结束键,因此必须更改 scio 版本才能使这些设置器公开。

  2. scan.addColumn("family".getBytes, "qualifier".getBytes)转换为用 aRowFilter加到 a 的两个 s (主要类似于 AND)。第一个将设置 familyNameRegexFilter,第二个将设置 columnNameRegexFilterRowFilterChainRowFilterRowFilter

  3. scan.setMaxVersions()转换为RowFilter带有cellsPerColumnLimitFilter集合的。它需要从#2 添加到链中。警告:如果您使用 a 的timestampRangeFilter或值过滤器RowFilter来限制列的范围,请确保将 a 放在cellsPerColumnLimitFilter链的末尾。

于 2017-06-21T15:34:30.923 回答