0

我有一个基本的 Beam 管道,它从 GCS 读取数据,执行 Beam SQL 转换并将结果写入 BigQuery。

当我不在我的 SQL 语句中进行任何聚合时,它可以正常工作:

..
PCollection<Row> outputStream =
                sqlRows.apply(
                        "sql_transform",
                        SqlTransform.query("select views from PCOLLECTION"));
outputStream.setCoder(SCHEMA.getRowCoder());
..

但是,当我尝试用总和进行聚合时,它会失败(抛出CannotPlanException异常):

..
PCollection<Row> outputStream =
                sqlRows.apply(
                        "sql_transform",
                        SqlTransform.query("select wikimedia_project, sum(views) from PCOLLECTION group by wikimedia_project"));
outputStream.setCoder(SCHEMA.getRowCoder());
..

堆栈跟踪:

Step #1: 11:47:37,562 0    [main] INFO  org.apache.beam.runners.dataflow.DataflowRunner - PipelineOptions.filesToStage was not specified. Defaulting to files from the classpath: will stage 117 files. Enable logging at DEBUG level to see which files will be staged.
Step #1: 11:47:39,845 2283 [main] INFO  org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQL:
Step #1: SELECT `PCOLLECTION`.`wikimedia_project`, SUM(`PCOLLECTION`.`views`)
Step #1: FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
Step #1: GROUP BY `PCOLLECTION`.`wikimedia_project`
Step #1: 11:47:40,387 2825 [main] INFO  org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner - SQLPlan>
Step #1: LogicalAggregate(group=[{0}], EXPR$1=[SUM($1)])
Step #1:   BeamIOSourceRel(table=[[beam, PCOLLECTION]])
Step #1: 
Step #1: Exception in thread "main" org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.RelOptPlanner$CannotPlanException: Node [rel#7:Subset#1.BEAM_LOGICAL.[]] could not be implemented; planner state:
Step #1: 
Step #1: Root: rel#7:Subset#1.BEAM_LOGICAL.[]
Step #1: Original rel:
Step #1: LogicalAggregate(subset=[rel#7:Subset#1.BEAM_LOGICAL.[]], group=[{0}], EXPR$1=[SUM($1)]): rowcount = 10.0, cumulative cost = {11.375000476837158 rows, 0.0 cpu, 0.0 io}, id = 5
Step #1:   BeamIOSourceRel(subset=[rel#4:Subset#0.BEAM_LOGICAL.[]], table=[[beam, PCOLLECTION]]): rowcount = 100.0, cumulative cost = {100.0 rows, 101.0 cpu, 0.0 io}, id = 2
Step #1: 
Step #1: Sets:
Step #1: Set#0, type: RecordType(VARCHAR wikimedia_project, BIGINT views)
Step #1:        rel#4:Subset#0.BEAM_LOGICAL.[], best=rel#2, importance=0.81
Step #1:                rel#2:BeamIOSourceRel.BEAM_LOGICAL.[](table=[beam, PCOLLECTION]), rowcount=100.0, cumulative cost={100.0 rows, 101.0 cpu, 0.0 io}
Step #1:        rel#10:Subset#0.ENUMERABLE.[], best=rel#9, importance=0.405
Step #1:                rel#9:BeamEnumerableConverter.ENUMERABLE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[]), rowcount=100.0, cumulative cost={1.7976931348623157E308 rows, 1.7976931348623157E308 cpu, 1.7976931348623157E308 io}
Step #1: Set#1, type: RecordType(VARCHAR wikimedia_project, BIGINT EXPR$1)
Step #1:        rel#6:Subset#1.NONE.[], best=null, importance=0.9
Step #1:                rel#5:LogicalAggregate.NONE.[](input=rel#4:Subset#0.BEAM_LOGICAL.[],group={0},EXPR$1=SUM($1)), rowcount=10.0, cumulative cost={inf}
Step #1:        rel#7:Subset#1.BEAM_LOGICAL.[], best=null, importance=1.0
Step #1:                rel#8:AbstractConverter.BEAM_LOGICAL.[](input=rel#6:Subset#1.NONE.[],convention=BEAM_LOGICAL,sort=[]), rowcount=10.0, cumulative cost={inf}
Step #1: 
Step #1: 
Step #1:        at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset$CheapestPlanReplacer.visit(RelSubset.java:448)
Step #1:        at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.RelSubset.buildCheapestPlan(RelSubset.java:298)
Step #1:        at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.plan.volcano.VolcanoPlanner.findBestExp(VolcanoPlanner.java:666)
Step #1:        at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.tools.Programs$RuleSetProgram.run(Programs.java:368)
Step #1:        at org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.prepare.PlannerImpl.transform(PlannerImpl.java:336)
Step #1:        at org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner.convertToBeamRel(BeamQueryPlanner.java:138)
Step #1:        at org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:105)
Step #1:        at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:96)
Step #1:        at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:79)
Step #1:        at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
Step #1:        at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
Step #1:        at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:338)
Step #1:        at org.polleyg.TemplatePipeline.main(TemplatePipeline.java:59)
Step #1: :run FAILED
Step #1: 
Step #1: FAILURE: Build failed with an exception.

我正在使用 Beam 2.6.0

我错过了一些明显的东西吗?

4

1 回答 1

1

This should work, it's a bug. Filed BEAM-5384.

If you look at the plan, it has the LogicalAggregate operation that represents the aggregation and needs to be implemented by Beam. Due to how Beam works, to implement an aggregation it also needs to pull some information from LogicalProject operation that represents field access in select f1, f2, and that's what is missing here. It is not very clear yet whether it is a bug where the query is overly optimized and projection is removed from the plan, or whether it is a valid use case that Beam should support.

One suggestion I have is to try to modify the select clause, e.g. re-order fields, add more fields.

Update:

至少有一个问题导致了这种情况。基本上,当您的架构只有您在查询中使用的字段时,就不需要投影并且 Calcite 不会将其添加到计划中。然而,Beam 聚合需要一个投影节点来从中提取窗口信息(这是当前的实现,它可能不是正确的做法)。

解决方法: 因此,为了修复特定查询,您可以将额外字段添加到模式中而不在查询中使用它们,这将导致 Calcite 将投影节点添加到计划中,并且将应用 Beam SQL 聚合。

Beam HEAD 现在修复了这个特定问题:https ://github.com/apache/beam/commit/8c35781d62846211e43b6b122b557f8c3fdaec6d#diff-4f4ffa265fe666e99c37c346d50da67dR637

于 2018-09-13T21:36:35.073 回答