2

我正在使用从 BigQuery 表和文件读取的 apache Beam 编写的 google 数据流上运行一项工作。转换数据并将其写入其他 BigQuery 表。工作“通常”成功,但有时我在从大查询表中读取时随机得到空指针异常并且我的工作失败:

(288abb7678892196): java.lang.NullPointerException
at org.apache.beam.sdk.io.gcp.bigquery.BigQuerySourceBase.split(BigQuerySourceBase.java:98)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.splitAndValidate(WorkerCustomSources.java:261)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.performSplitTyped(WorkerCustomSources.java:209)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.performSplitWithApiLimit(WorkerCustomSources.java:184)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSources.performSplit(WorkerCustomSources.java:161)
at com.google.cloud.dataflow.worker.runners.worker.WorkerCustomSourceOperationExecutor.execute(WorkerCustomSourceOperationExecutor.java:47)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.executeWork(DataflowWorker.java:341)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.doWork(DataflowWorker.java:297)
at com.google.cloud.dataflow.worker.runners.worker.DataflowWorker.getAndPerformWork(DataflowWorker.java:244)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.doWork(DataflowBatchWorkerHarness.java:125)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:105)
at com.google.cloud.dataflow.worker.runners.worker.DataflowBatchWorkerHarness$WorkerThread.call(DataflowBatchWorkerHarness.java:92)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

我无法弄清楚这与什么有关。当我清除临时目录并重新上传我的模板时,作业再次通过。

我从 BQ 阅读的方式很简单:

BigQueryIO.read().fromQuery()

我将不胜感激任何帮助。

任何人?

4

3 回答 3

3

我最终在谷歌问题跟踪器中添加了错误。经过与谷歌员工的长时间交谈和他们的调查后发现,将模板与从 BigQuery 读取的数据流批处理作业一起使用是没有意义的,因为您只能执行一次它们。

引用:“对于 BigQuery 批处理管道,模板只能执行一次,因为 BigQuery 作业 ID 是在创建模板时设置的。此限制将在 SDK 2 的未来版本中删除,但我不能说。创建模板:https ://cloud.google.com/dataflow/docs/templates/creating-templates#pipeline-io-and-runtime-parameters “

如果错误比 NullpointerException 更清楚,那还是很好的。

无论如何,我希望这对将来的人有所帮助。

如果有人对整个对话感兴趣,这就是问题所在: https ://issuetracker.google.com/issues/63124894

于 2017-07-13T14:23:11.763 回答
2

我也遇到了这个问题,经过一番挖掘,发现限制已在 2.2.0 版本中删除。但是,它还没有正式发布。您可以在他们的JIRA 项目上查看此版本的进度(似乎只剩下一个问题)。

但是如果你现在想使用它,你可以自己编译它,这并不难。只需从他们的github 镜像中签出源代码,签出标签v2.2.0-RC4,然后运行mvn clean install。然后只需修改您的项目依赖项pom.xml以指向版本2.2.0

从 2.2.0 开始,如果你想使用BigQueryIO模板,你需要调用withTemplateCompatibility()

BigQueryIO
    .readTableRows() // read() has been deprecated in 2.2.0
    .withTemplateCompatibility() // You need to add this
    .fromQuery(options.getInputQuery())

我目前正在为我的项目使用 2.2.0,到目前为止它运行良好。

于 2017-11-27T03:40:01.220 回答
1

好的,让我提供更多细节。

  • 作业作为模板上传并在谷歌数据流上运行
  • 工作通常会成功——这就是为什么我怀疑实际代码有问题。异常来自源,它看起来像:bqServices.getDatasetService(bqOptions)在 BigQuerySourceBase 中返回 null
  • 是的,我确实提供了实际查询

下面是我工作的 DAG。如您所见,此运行成功。它处理了从 BQ 导出的超过 200 万行,从 csv 文件中导出的 150 万行,并将 800k 写回 BigQuery(数字正确)。这项工作基本上按预期工作(当它工作时)。左上角(读取事务)是对 BQ 进行查询的步骤。这一步有时会无缘无故地失败。

成功运行 - Beam DAG

下面是在 BQ 源上使用 Nullpointer 失败时的相同作业。

运行失败 - Beam DAG

我不确定在这种情况下代码片段会有多大帮助,但这是执行查询的一部分:

PCollection<Transaction> transactions = p.apply("Read Transactions", BigQueryIO.read().fromQuery(createTransactionQuery(options)))
                                        .apply("Map to Transaction", MapElements.via(new TableRowToTransactionFn()));

    PCollection<KV<String, Transaction>> transactionsPerMtn = 
            transactions.apply("Filter Transactions Without MTN", Filter.by(t -> t.transactionMtn != null))
                        .apply("Map Transactions to MTN key", MapElements.into(
                    TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptor.of(Transaction.class)))
                                    .via(t -> KV.of(t.transactionMtn, t)));

在获取查询的方法下方:

private ValueProvider<String> createTransactionQuery(TmsPipelineOptions options) {
    return NestedValueProvider.of(options.getInputTransactionTable(), table -> {
        StringBuilder sb = new StringBuilder();
        sb.append(
                "SELECT transaction_id, transaction_mtn, transaction_folio_number, transaction_payer_folio_number FROM ");
        sb.append(table);
        return sb.toString();
    });
}

我相信大查询源中存在某种错误,会导致这样的问题。我只是无法确定是什么原因造成的,因为它是随机发生的。就像我写的那样,上次遇到它时,我刚刚清除了 gcs 上的临时目录并重新上传了我的模板(没有任何代码更改),然后工作又开始工作了。

于 2017-06-26T08:02:37.137 回答