0

如何在 BEAM SQL 中的 GroupByKey 之前包含 Window.into 或 Window.triggering 转换?

我有以下 2 个表:

CREATE TABLE table1(
field1 varchar 
,field2 varchar
)

第二张桌子

CREATE TABLE table2(
field1 varchar
,field3 varchar
)

我正在将结果写在第三张表中

CREATE TABLE table3(
field1 varchar
,field3 varchar
)

前 2 个表正在从 kafka 流中读取数据,我正在对这些表进行连接并将数据插入到第三个表中,使用以下查询。前 2 个表是无界/无界的

INSERT INTO table3 
        (field1, 
         field3) 
SELECT a.field1, 
   b.field3 
FROM   table1 a 
   JOIN table2 b 
     ON a.field1 = b.field1 

我收到以下错误:

原因:java.lang.IllegalStateException:GroupByKey 不能在没有触发器的情况下应用于 GlobalWindow 中的无界 PCollection。在 GroupByKey 之前使用 Window.into 或 Window.triggering 转换。在 org.apache.beam.sdk.transforms.GroupByKey.applicableTo(GroupByKey.java:173) 在 org.apache.beam.sdk.transforms.GroupByKey.expand(GroupByKey.java:204) 在 org.apache.beam.sdk .transforms.GroupByKey.expand(GroupByKey.java:120) 在 org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537) 在 org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:472 ) 在 org.apache.beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:126) 上的 org.apache.beam.sdk.values.PCollection.apply(PCollection.java:286)。 beam.sdk.transforms.join.CoGroupByKey.expand(CoGroupByKey.java:74) 在 org.

4

1 回答 1

5

这是 Beam SQL 的当前实现限制。您需要定义窗口,然后加入每个窗口的输入

如何在 Beam SQL 中进行连接和窗口化的几个示例:

  • 带有HOP窗口和连接的复杂 SQL 查询;
  • 测试,它在 SQL 之外用 Java 定义一个窗口,然后应用连接查询;
  • 可以在此处找到其他窗口函数的示例;

背景

问题是由于通常难以为无界数据流定义 Join 操作,它不仅限于 Beam SQL。

例如,想象一下,当数据处理系统从 2 个源接收输入,然后必须匹配它们之间的记录。从高层次的角度来看,这样的系统必须保留它迄今为止看到的所有数据,然后对于每个新记录,它必须检查第二个输入源中的所有记录,看看那里是否有匹配。当您拥有有限且小型的数据源时,它可以正常工作。在简单的情况下,您可以将所有内容加载到内存中,匹配来自源的数据,生成输出。

使用流数据,您不能永远缓存它。如果数据永远不会停止出现怎么办?并且不清楚何时要发出数据。如果你有一个outer join操作,你什么时候决定你没有来自另一个输入的匹配记录?

例如,请参阅Beam 指南的无界部分的解释Beam 中的连接通常使用(Beam SQL 连接)在其之上实现。PCollectionsGroupByKeyCoGroupByKey

对于特定的管道,所有这些问题都可能得到解答,但在一般情况下很难解决。Beam SDK 和 Beam SQL 中的当前方法是将其委托给用户来解决具体的业务案例。Beam 允许用户决定将哪些数据聚合到一个窗口中,等待延迟数据的时间,以及何时发出结果。还有诸如状态单元计时器之类的东西可以进行更精细的控制。这允许程序员编写管道来明确定义行为并在一定程度上解决这些问题,但具有(很多)额外的复杂性。

Beam SQL 是在常规 Beam SDK 概念之上实现的,并且受到相同的限制。但它有更多自己的实现。例如,您没有定义触发器、状态或自定义窗口的 SQL 语法。或者您不能编写ParDo可以在外部服务中保持状态的自定义。

于 2018-06-13T23:49:15.390 回答