0

我正在开发一个数据流管道,它使用SqlTransform库以及org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf.

这是一张代码幻灯片:

import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;

public class EtlSqlTransformations extends PTransform<PCollection<Row>, PCollection<Row>> {
    @Override
    public PCollection<Row> expand(PCollection<Row> input) {
        String sql1 = "SELECT campaign_id, format_id, story_id, browser, device, os, " +
                "       COUNTIF(event_type = 'track_click_through') as clickthrough, " +
                "       FROM PCOLLECTION " +
                "       GROUP BY campaign_id, format_id, story_id, browser, device, os";

        PCollection<Row> groupedImpressions = input.apply("groupedImpressions",
                SqlTransform.query(sql1).registerUdaf("COUNTIF", new CountIf.CountIfFn()));
        return groupedImpressions;
    }
}

这在本地测试时工作正常(我还创建了一些测试,工作正常):


        PCollection<Row> results = rowPCollection.apply("SQL TRANSFORM", new EtlSqlTransformations());

        // Expected output
        Row expectedRow = Row.withSchema(EtlSqlTransformations.outputSchema)
                .addValues("1044", ...)
                .build();

        PAssert.that(results).containsInAnyOrder(expectedRow);
        pipeline.run();

问题是当我想使用 Dataflow 在 Google 云中部署时,我有以下输出:

[WARNING] 
java.lang.RuntimeException: java.io.IOException: Could not obtain a Coder for the accumulator
    at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:78)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
Caused by: java.io.IOException: Could not obtain a Coder for the accumulator
    at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:207)
    at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179)
    at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438)
    at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248)
    at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
    at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:75)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Cannot infer coder for type parameter AccumT
    at org.apache.beam.sdk.coders.CoderRegistry.getCoder (CoderRegistry.java:328)
    at org.apache.beam.sdk.transforms.CombineFnBase$AbstractGlobalCombineFn.getAccumulatorCoder (CombineFnBase.java:119)
    at org.apache.beam.sdk.transforms.Combine$CombineFn.getAccumulatorCoder (Combine.java:391)
    at org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter$WrappedCombinerBase.getAccumulatorCoder (AggregationCombineFnAdapter.java:75)
    at org.apache.beam.sdk.transforms.CombineFns$ComposedCombineFn.getAccumulatorCoder (CombineFns.java:430)
    at org.apache.beam.sdk.schemas.transforms.SchemaAggregateFn$Inner.getAccumulatorCoder (SchemaAggregateFn.java:335)
    at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:204)
    at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179)
    at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438)
    at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248)
    at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
    at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:75)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
    at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
    at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
    at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
    at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
    at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
    at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
    at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
    at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
    at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke (Method.java:566)
    at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
    at java.lang.Thread.run (Thread.java:829)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time:  57.006 s
[INFO] Finished at: 2021-10-25T13:54:23Z
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project connected-stories-analytics-logger-store: An exception occured while executing the Java class. java.io.IOException: Could not obtain a Coder for the accumulator: Cannot infer coder for type parameter AccumT -> [Help 1]

我搜索了这个问题,但我什么也没找到,只是一些类似的案例,他们开发了自己的 Agg 函数并且他们需要定义 CODER。在这种情况下,我不知道在哪里可以找到编码器,或者我是否必须自己创建它,因为我使用的是 Beam 函数 ( org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf)。

所以问题是:

  1. 有没有办法使用这个Beam函数来添加需要在Dataflow中运行的编码器?
  2. 如果我需要更改功能并创建一个新功能和编码器,我该怎么做?

谢谢

4

1 回答 1

0

好的,我通过自己实现 COUTIF 解决了这个问题。

    public static class CountConditional extends Combine.CombineFn<Boolean, Long, Long> {
    @Override
    public Long createAccumulator() {
        return Long.valueOf(0);
    }

    @Override
    public Long addInput(Long accumulator, Boolean input) {
        if (input) {
            ++accumulator;
        }
        return accumulator;
    }

    @Override
    public Long mergeAccumulators(Iterable<Long> accumulators) {
        Long v = Long.valueOf(0);
        Iterator<Long> ite = accumulators.iterator();
        while (ite.hasNext()) {
            v += ite.next();
        }
        return v;
    }

    @Override
    public Long extractOutput(Long accumulator) {
        return accumulator;
    }
}

我将它放在使用它的同一个 PTransform 中,因此我不必与编码人员打交道。

于 2021-10-29T08:33:14.730 回答