我一直在使用 Beam SQL DSL,如果不提供手动了解输出模式的编码器,我将无法使用查询的输出。我可以推断输出模式而不是硬编码吗?
演练或示例实际上都没有使用查询的输出。我使用Scio而不是普通的 Java API 来保持代码相对可读和简洁,我认为这对这个问题没有影响。
这是我的意思的一个例子。
给定一个输入模式inSchema
和一些映射到 a 的数据源,Row
如下所示:(在此示例中,基于 Avro,但同样,我认为这并不重要):
sc.avroFile[Foo](args("input"))
.map(fooToRow)
.setCoder(inSchema.getRowCoder)
.applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
.saveAsTextFile(args("output"))
运行此管道会产生KryoException
如下结果:
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
Serialization trace:
fieldIndices (org.apache.beam.sdk.schemas.Schema)
schema (org.apache.beam.sdk.values.RowWithStorage)
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
com.esotericsoftware.kryo.KryoException: java.lang.NullPointerException
但是,插入一个RowCoder
匹配的 SQL 输出,在本例中是一个 count int 列:
...snip...
.applyTransform(SqlTransform.query("SELECT COUNT(1) FROM PCOLLECTION"))
.setCoder(Schema.builder().addInt64Field("count").build().getRowCoder)
.saveAsTextFile(args("output"))
现在管道运行得很好。
鉴于我们指定了输入模式/编码器和查询,必须手动告诉管道如何编码 SQL 输出似乎是不必要的。在我看来,我们应该能够从中推断出输出模式 - 但除了直接使用 Calcite 之外,我看不出如何?
在 Beam Jira 上开票之前,我想我会检查一下我没有遗漏一些明显的东西!