0

我正在用 Beam SQL 做一些实验。我PCollection<Row>从转换中得到 aSampleSource并将其输出传递给 a SqlTransform

String sql1 = "select c1, c2, c3 from PCOLLECTION where c1 > 1";

下面的代码运行没有任何错误。

POutput it = p.apply(new SampleSource()).apply(SqlTransform.query(sql1));
p.run().waitUntilFinish();

但是,当我尝试以下代码行时,出现运行时错误。

POutput it = p.apply(new SampleSource());
it.getPipeline().apply(SqlTransform.query(sql1));
p.run().waitUntilFinish();

错误详细信息是

Caused by: org.apache.beam.repackaged.beam_sdks_java_extensions_sql.org.apache.calcite.sql.validate.SqlValidatorException: Object 'PCOLLECTION' not found

请提供一些指示。

4

1 回答 1

1

它不起作用,因为您将 aSqlTransform应用于管道,而不是PCollection.

您可能希望按照以下方式更改它:


// source probably returns a PCollection,
// would make sense to change 'it' to PCollection:
PCollection<...> it = p.apply(new SampleSource());

// then apply SqlTransform to the PCollection from the previous step,
// that is apply it directly to 'it':
it.apply(SqlTransform.query(sql1));

...

从高层次的角度来看,Beam 管道是如何工作的:

  • 创建管道;
  • 应用PTransform从某个源读取的 IO 并生成从源PColelction读取的某些元素;
  • 链式应用更多PTransforms来自PCollection上一步的数据来处理数据(从概念上讲,PCollections每一步都会产生不同的结果);
  • 重复;

SqlTransform是一个 normal PTransform,它应该被应用到一个PCollection元素上并PCollection作为结果输出另一个。您在其中指定的查询将SqlTransform.create()应用于PCollection. 它期望数据来自一个神奇的PCOLLECTION表,代表PCollection你应用的那个SqlTransform

您在示例中所做的是不同的:

  • 创建管道;
  • 应用PTransform产生 aPOutput不一定 a的源PCollection
  • 如果您的源代码,则忽略输出,而是采用原始管道并将 aSqlTransform直接应用于它;

所以发生的情况是,SqlTransform在这种情况下,它被应用于管道的“根”,而不是PCollection来自源的那个。而不是一个接一个地应用链,PTransforms您现在有两个PTransforms彼此独立地应用于根。

另一个需要注意的是,SqlTransform期望输入元素是Rows,因为 SQL 作为一种语言仅适用于表示为行的数据。有两种方法可以实现这一点:

  • 通过在源和之间Rows应用另一个元素,手动将源生成的元素转换为;ParDoSqlTransform
  • 使用 Beam 的Schema框架(例如 check outPCollection.setSchema()方法),它允许 Beam SQL 自动将输入元素转换为Rows;
于 2019-07-01T22:38:26.690 回答