我正在尝试使用 Samza Runner 从此处运行 Wordcount 演示。这是我的 build.gradle
plugins {
id 'eclipse'
id 'java'
id 'application'
// 'shadow' allows us to embed all the dependencies into a fat jar.
id 'com.github.johnrengelman.shadow' version '4.0.3'
}
mainClassName = 'samples.quickstart.WordCount'
maven {
url = uri('http://packages.confluent.io/maven/')
}
mavenCentral()
}
sourceCompatibility = 1.8
targetCompatibility = 1.8
ext.apacheBeamVersion = '2.22.0'
dependencies {
shadow "org.apache.beam:beam-sdks-java-core:$apacheBeamVersion"
runtime "org.apache.beam:beam-runners-direct-java:$apacheBeamVersion"
runtime "org.slf4j:slf4j-api:1.+"
runtime "org.slf4j:slf4j-jdk14:1.+"
compile group: 'org.apache.beam', name: 'beam-runners-samza', version: '2.22.0'
compile group: 'org.apache.samza', name: 'samza-api', version: '1.4.0'
compile group: 'org.apache.samza', name: 'samza-core_2.11', version: '1.4.0'
compile group: 'org.apache.samza', name: 'samza-kafka_2.11', version: '1.4.0'
compile group: 'org.apache.samza', name: 'samza-kv_2.11', version: '1.4.0'
compile group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: '1.4.0'
testCompile "junit:junit:4.+"
}
shadowJar {
zip64 true
baseName = 'WordCount' // Name of the fat jar file.
classifier = null // Set to null, otherwise 'shadow' appends a '-all' to the jar file name.
manifest {
attributes('Main-Class': mainClassName) // Specify where the main class resides.
}
}
我的 wordcount.java 如下。
package samples.quickstart;
import org.apache.beam.runners.samza.SamzaRunner;
//import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;
public class WordCount {
private static final String jobName = "beamtest";
public static void main(String[] args) {
String inputsDir = "data/*";
String outputsPrefix = "outputs/part";
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
options.setRunner(SamzaRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply("Read lines", TextIO.read().from(inputsDir))
.apply("Find words", FlatMapElements.into(TypeDescriptors.strings())
.via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
.apply("Filter empty words", Filter.by((String word) -> !word.isEmpty()))
.apply("Count words", Count.perElement())
.apply("Write results", MapElements.into(TypeDescriptors.strings())
.via((KV<String, Long> wordCount) ->
wordCount.getKey() + ": " + wordCount.getValue()))
.apply(TextIO.write().to(outputsPrefix));
pipeline.run().waitUntilFinish();
}
}
我正在使用 Beam 版本 2.22.0。我尝试了以下组合。带有 Beam 2.22 的 Samza 1.4、带有 Beam 2.11 和 Beam 2.22 的 Samza 1.0 以及带有 Beam 2.11.0 的 Samza 0.14.1。但是,在执行时我收到以下错误:
java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant
我正在使用 Java 1.8。有没有人知道是什么导致了这个问题?