我正在开发一个数据流管道,它使用SqlTransform
库以及org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf
.
这是一张代码幻灯片:
import org.apache.beam.sdk.extensions.sql.SqlTransform;
import org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
public class EtlSqlTransformations extends PTransform<PCollection<Row>, PCollection<Row>> {
@Override
public PCollection<Row> expand(PCollection<Row> input) {
String sql1 = "SELECT campaign_id, format_id, story_id, browser, device, os, " +
" COUNTIF(event_type = 'track_click_through') as clickthrough, " +
" FROM PCOLLECTION " +
" GROUP BY campaign_id, format_id, story_id, browser, device, os";
PCollection<Row> groupedImpressions = input.apply("groupedImpressions",
SqlTransform.query(sql1).registerUdaf("COUNTIF", new CountIf.CountIfFn()));
return groupedImpressions;
}
}
这在本地测试时工作正常(我还创建了一些测试,工作正常):
PCollection<Row> results = rowPCollection.apply("SQL TRANSFORM", new EtlSqlTransformations());
// Expected output
Row expectedRow = Row.withSchema(EtlSqlTransformations.outputSchema)
.addValues("1044", ...)
.build();
PAssert.that(results).containsInAnyOrder(expectedRow);
pipeline.run();
问题是当我想使用 Dataflow 在 Google 云中部署时,我有以下输出:
[WARNING]
java.lang.RuntimeException: java.io.IOException: Could not obtain a Coder for the accumulator
at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:78)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
Caused by: java.io.IOException: Could not obtain a Coder for the accumulator
at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:207)
at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179)
at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438)
at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248)
at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:75)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
Caused by: org.apache.beam.sdk.coders.CannotProvideCoderException: Cannot infer coder for type parameter AccumT
at org.apache.beam.sdk.coders.CoderRegistry.getCoder (CoderRegistry.java:328)
at org.apache.beam.sdk.transforms.CombineFnBase$AbstractGlobalCombineFn.getAccumulatorCoder (CombineFnBase.java:119)
at org.apache.beam.sdk.transforms.Combine$CombineFn.getAccumulatorCoder (Combine.java:391)
at org.apache.beam.sdk.extensions.sql.impl.transform.agg.AggregationCombineFnAdapter$WrappedCombinerBase.getAccumulatorCoder (AggregationCombineFnAdapter.java:75)
at org.apache.beam.sdk.transforms.CombineFns$ComposedCombineFn.getAccumulatorCoder (CombineFns.java:430)
at org.apache.beam.sdk.schemas.transforms.SchemaAggregateFn$Inner.getAccumulatorCoder (SchemaAggregateFn.java:335)
at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.extractAccumulatorCoder (CombineTranslation.java:204)
at org.apache.beam.runners.core.construction.CombineTranslation$CombineGroupedValuesPayloadTranslator.translate (CombineTranslation.java:179)
at org.apache.beam.runners.core.construction.PTransformTranslation$KnownTransformPayloadTranslator.translate (PTransformTranslation.java:438)
at org.apache.beam.runners.core.construction.PTransformTranslation.toProto (PTransformTranslation.java:248)
at org.apache.beam.runners.core.construction.SdkComponents.registerPTransform (SdkComponents.java:175)
at org.apache.beam.runners.core.construction.PipelineTranslation$1.leaveCompositeTransform (PipelineTranslation.java:75)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:584)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit (TransformHierarchy.java:579)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500 (TransformHierarchy.java:239)
at org.apache.beam.sdk.runners.TransformHierarchy.visit (TransformHierarchy.java:213)
at org.apache.beam.sdk.Pipeline.traverseTopologically (Pipeline.java:468)
at org.apache.beam.runners.core.construction.PipelineTranslation.toProto (PipelineTranslation.java:60)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:996)
at org.apache.beam.runners.dataflow.DataflowRunner.run (DataflowRunner.java:203)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:322)
at org.apache.beam.sdk.Pipeline.run (Pipeline.java:308)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.runPubsubToStoragePipeline (PubsubToStorage.java:115)
at com.hypertv.AnalyticsLoggerStore.PubsubToStorage.main (PubsubToStorage.java:122)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0 (Native Method)
at jdk.internal.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62)
at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke (DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke (Method.java:566)
at org.codehaus.mojo.exec.ExecJavaMojo$1.run (ExecJavaMojo.java:282)
at java.lang.Thread.run (Thread.java:829)
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 57.006 s
[INFO] Finished at: 2021-10-25T13:54:23Z
[INFO] ------------------------------------------------------------------------
[ERROR] Failed to execute goal org.codehaus.mojo:exec-maven-plugin:1.6.0:java (default-cli) on project connected-stories-analytics-logger-store: An exception occured while executing the Java class. java.io.IOException: Could not obtain a Coder for the accumulator: Cannot infer coder for type parameter AccumT -> [Help 1]
我搜索了这个问题,但我什么也没找到,只是一些类似的案例,他们开发了自己的 Agg 函数并且他们需要定义 CODER。在这种情况下,我不知道在哪里可以找到编码器,或者我是否必须自己创建它,因为我使用的是 Beam 函数 ( org.apache.beam.sdk.extensions.sql.impl.transform.agg.CountIf
)。
所以问题是:
- 有没有办法使用这个Beam函数来添加需要在Dataflow中运行的编码器?
- 如果我需要更改功能并创建一个新功能和编码器,我该怎么做?
谢谢