0

尝试使用 BeamSQL 取消嵌套 PCollection 的嵌套类型。让我们假设具有员工及其详细信息的 PCollection。这里的详细信息在嵌套集合中。因此,如果我们使用 BeamSQL,"SELECT PCOLLECTION.details FROM PCOLLECTION"那么在单独的 PCollection 中获取嵌套类型的详细信息作为数组集合。但是,当我想从嵌套类型集合中获取特定列作为详细信息时,会出现无法找到列名之类的错误。尝试了类似 BeamSQL(类似于 BigQuery SQL)"SELECT X.address FROM PCOLLECTION, Unnest(details) as X"然后得到空指针异常。使用 2.12.0 Apache Beam 版本。

欣赏一些人请帮助解决这个问题。

下面是详细信息嵌套值的示例数据(详细信息有电子邮件、电话列。所以每行,'n'没有详细信息列表。这里有两个详细信息列表):

WARNING: printValue:Row:[[Row:[lourdurajan@gmail.com, 9840618047], Row:[lourdurajan@sanmina.com, 9840618047]]]

这是第二个 select 语句的 Java 堆栈跟踪:

SELECT `X`.`email`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`,
UNNEST(`PCOLLECTION`.`details`) AS `X`
May 08, 2019 11:23:30 AM org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner convertToBeamRel
INFO: SQLPlan>
LogicalProject(email=[$3])
  LogicalCorrelate(correlation=[$cor0], joinType=[inner], requiredColumns=[{2}])
    BeamIOSourceRel(table=[[beam, PCOLLECTION]])
    Uncollect
      LogicalProject(details=[$cor0.details_2])
        LogicalValues(tuples=[[{ 0 }]])

May 08, 2019 11:23:30 AM org.apache.beam.sdk.extensions.sql.impl.BeamQueryPlanner convertToBeamRel
INFO: BEAMPlan>
BeamCalcRel(expr#0..4=[{inputs}], email=[$t3])
  BeamUnnestRel(unnestIndex=[2])
    BeamIOSourceRel(table=[[beam, PCOLLECTION]])

[WARNING] 
java.lang.NullPointerException
    at org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils.toSchema(CalciteUtils.java:171)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamUnnestRel$Transform.expand(BeamUnnestRel.java:93)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamUnnestRel$Transform.expand(BeamUnnestRel.java:87)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:66)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.lambda$buildPCollectionList$0(BeamSqlRelUtils.java:47)
    at java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:193)
    at java.util.Iterator.forEachRemaining(Iterator.java:116)
    at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801)
    at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:481)
    at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:471)
    at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708)
    at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
    at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:499)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.buildPCollectionList(BeamSqlRelUtils.java:48)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:64)
    at org.apache.beam.sdk.extensions.sql.impl.rel.BeamSqlRelUtils.toPCollection(BeamSqlRelUtils.java:36)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:111)
    at org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:79)
    at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:537)
    at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:488)
    at org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370)
    at com.sanmina.BeamSQLUnnest.main(BeamSQLUnnest.java:217)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run(ExecJavaMojo.java:282)
    at java.lang.Thread.run(Thread.java:748)
4

1 回答 1

0

您可以使用 BigQueryIO 实现此目的。

String Query ="SELECT `X`.`email`
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`,
UNNEST(`PCOLLECTION`.`details`) AS `X`"

BigQueryIO.readTableRows().fromQuery(query).usingStandardSql()
于 2019-09-09T15:13:41.777 回答