3

我正在尝试使用 Samza Runner 从此处运行 Wordcount 演示。这是我的 build.gradle

plugins {
  id 'eclipse'
  id 'java'
  id 'application'

  // 'shadow' allows us to embed all the dependencies into a fat jar.
  id 'com.github.johnrengelman.shadow' version '4.0.3'
}

mainClassName = 'samples.quickstart.WordCount'

maven {
        url = uri('http://packages.confluent.io/maven/')
    }
  mavenCentral()
}

sourceCompatibility = 1.8
targetCompatibility = 1.8

ext.apacheBeamVersion = '2.22.0'

dependencies {
  shadow "org.apache.beam:beam-sdks-java-core:$apacheBeamVersion"
    
  runtime "org.apache.beam:beam-runners-direct-java:$apacheBeamVersion"
  runtime "org.slf4j:slf4j-api:1.+"
  runtime "org.slf4j:slf4j-jdk14:1.+"
  compile group: 'org.apache.beam', name: 'beam-runners-samza', version: '2.22.0'
compile group: 'org.apache.samza', name: 'samza-api', version: '1.4.0'
  compile group: 'org.apache.samza', name: 'samza-core_2.11', version: '1.4.0'
  compile group: 'org.apache.samza', name: 'samza-kafka_2.11', version: '1.4.0'
  compile group: 'org.apache.samza', name: 'samza-kv_2.11', version: '1.4.0'
  compile group: 'org.apache.samza', name: 'samza-kv-rocksdb_2.11', version: '1.4.0'
  testCompile "junit:junit:4.+"
}
shadowJar {
  zip64 true
  baseName = 'WordCount'  // Name of the fat jar file.
  classifier = null       // Set to null, otherwise 'shadow' appends a '-all' to the jar file name.
  manifest {
    attributes('Main-Class': mainClassName)  // Specify where the main class resides.
  }
} 

我的 wordcount.java 如下。

package samples.quickstart;

import org.apache.beam.runners.samza.SamzaRunner;
//import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.FlatMapElements;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.TypeDescriptors;
import java.util.Arrays;

public class WordCount {
    
    private static final String jobName = "beamtest";
    
  public static void main(String[] args) {
    String inputsDir = "data/*";
    String outputsPrefix = "outputs/part";

    PipelineOptions options = PipelineOptionsFactory.fromArgs(args).create();
    options.setRunner(SamzaRunner.class);

    Pipeline pipeline = Pipeline.create(options);
    
    pipeline
        .apply("Read lines", TextIO.read().from(inputsDir))
        .apply("Find words", FlatMapElements.into(TypeDescriptors.strings())
            .via((String line) -> Arrays.asList(line.split("[^\\p{L}]+"))))
        .apply("Filter empty words", Filter.by((String word) -> !word.isEmpty()))
        .apply("Count words", Count.perElement())
        .apply("Write results", MapElements.into(TypeDescriptors.strings())
            .via((KV<String, Long> wordCount) ->
                  wordCount.getKey() + ": " + wordCount.getValue()))
        .apply(TextIO.write().to(outputsPrefix));
    pipeline.run().waitUntilFinish();
  }
} 


我正在使用 Beam 版本 2.22.0。我尝试了以下组合。带有 Beam 2.22 的 Samza 1.4、带有 Beam 2.11 和 Beam 2.22 的 Samza 1.0 以及带有 Beam 2.11.0 的 Samza 0.14.1。但是,在执行时我收到以下错误:

java.lang.IncompatibleClassChangeError: Method 'void org.apache.samza.storage.kv.KeyValueStore.deleteAll(java.util.List)' must be InterfaceMethodref constant

我正在使用 Java 1.8。有没有人知道是什么导致了这个问题?

4

1 回答 1

0

可以把 build.gradle 和修改过的 wordcount.java 用 Samza runner 贴在这里,这样我们可以排查一下是不兼容问题还是配置问题。感谢您试用 Samza 跑步者!

于 2020-07-15T00:28:04.183 回答