我想使用 Beam 测试不同的流处理引擎,但是在包含 Flink和Samza 依赖项时无法运行程序。如果只包括其中一个,它适用于所有其他跑步者。
我的pom.xml
包含以下内容:
<properties>
<maven.compiler.release>1.11</maven.compiler.release>
<beam.version>2.20.0</beam.version>
<flink.version>1.9</flink.version>
<samza.version>1.4.0</samza.version>
<spark.version>2.4.5</spark.version>
<jackson.version>2.10.2</jackson.version>
</properties>
<dependencies>
<!-- Beam dependencies -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-sdks-java-core</artifactId>
<version>${beam.version}</version>
</dependency>
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-direct-java</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<!-- Flink dependencies -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-flink-${flink.version}</artifactId>
<version>${beam.version}</version>
</dependency>
<!-- Samza dependencies -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-samza</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-api</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-core_2.11</artifactId>
<version>${samza.version}</version>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kafka_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.samza</groupId>
<artifactId>samza-kv-rocksdb_2.11</artifactId>
<version>${samza.version}</version>
<scope>runtime</scope>
</dependency>
<!-- Spark dependencies -->
<dependency>
<groupId>org.apache.beam</groupId>
<artifactId>beam-runners-spark</artifactId>
<version>${beam.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>${spark.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>${spark.version}</version>
<scope>runtime</scope>
</dependency>
<!-- additional dependencies -->
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_2.11</artifactId>
<version>${jackson.version}</version>
<scope>runtime</scope>
</dependency>
</dependencies>
尝试执行时抛出的错误信息PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
是:
Exception in thread "main" java.lang.ExceptionInInitializerError
at WordCount.main(WordCount.java:20)
Caused by: java.lang.IllegalArgumentException: methods with same signature getMaxBundleSize() but incompatible return types: long and others
at java.base/java.lang.reflect.ProxyGenerator.checkReturnTypes(ProxyGenerator.java:657)
at java.base/java.lang.reflect.ProxyGenerator.generateClassFile(ProxyGenerator.java:462)
at java.base/java.lang.reflect.ProxyGenerator.generateProxyClass(ProxyGenerator.java:338)
at java.base/java.lang.reflect.Proxy$ProxyBuilder.defineProxyClass(Proxy.java:535)
at java.base/java.lang.reflect.Proxy$ProxyBuilder.build(Proxy.java:648)
at java.base/java.lang.reflect.Proxy.lambda$getProxyConstructor$1(Proxy.java:426)
at java.base/jdk.internal.loader.AbstractClassLoaderValue$Memoizer.get(AbstractClassLoaderValue.java:329)
at java.base/jdk.internal.loader.AbstractClassLoaderValue.computeIfAbsent(AbstractClassLoaderValue.java:205)
at java.base/java.lang.reflect.Proxy.getProxyConstructor(Proxy.java:424)
at java.base/java.lang.reflect.Proxy.getProxyClass(Proxy.java:384)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.validateWellFormed(PipelineOptionsFactory.java:1898)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.validateWellFormed(PipelineOptionsFactory.java:1842)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.register(PipelineOptionsFactory.java:1837)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.initializeRegistry(PipelineOptionsFactory.java:1825)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.<init>(PipelineOptionsFactory.java:1817)
at org.apache.beam.sdk.options.PipelineOptionsFactory$Cache.<init>(PipelineOptionsFactory.java:1786)
at org.apache.beam.sdk.options.PipelineOptionsFactory.resetCache(PipelineOptionsFactory.java:542)
at org.apache.beam.sdk.options.PipelineOptionsFactory.<clinit>(PipelineOptionsFactory.java:508)
... 1 more
任何人都可以帮助我吗?