1

我有一个 Scrunch Spark 管道,当我尝试使用以下命令将其输出保存为 Avro 格式时:

data.write(to.avroFile(path))

我得到以下异常:

java.lang.ClassCastException: org.apache.crunch.types.writable.WritableType cannot be cast to org.apache.crunch.types.avro.AvroType
at org.apache.crunch.io.avro.AvroFileTarget.configureForMapReduce(AvroFileTarget.java:77) ~[crunch-core-0.14.0-SNAPSHOT.jar:0.14.0-SNAPSHOT]
at org.apache.crunch.impl.spark.SparkRuntime.monitorLoop(SparkRuntime.java:327) [crunch-spark-0.14.0-SNAPSHOT.jar:0.14.0-SNAPSHOT]
at org.apache.crunch.impl.spark.SparkRuntime.access$000(SparkRuntime.java:80) [crunch-spark-0.14.0-SNAPSHOT.jar:0.14.0-SNAPSHOT]
at org.apache.crunch.impl.spark.SparkRuntime$2.run(SparkRuntime.java:139) [crunch-spark-0.14.0-SNAPSHOT.jar:0.14.0-SNAPSHOT]

而这反而工作得很好:

data.write(to.textFile(path))

(在这两种情况下都是一个字符串data同一个实例)PCollectionpath

我理解为什么会发生这个错误,PCollection我试图写的属于Writable类型家族而不是那个Avro。我不清楚的是如何在 Scrunch 中决定我的 PCollection 属于一个而不是另一个。

然而,这种机制在 Crunch 中似乎更加清晰。根据官方 Crunch 文档

Crunch 支持两种不同的类型族,它们都实现了 PTypeFamily 接口:一种用于 Hadoop 的 Writable 接口,另一种基于 Apache Avro。还有一些类包含每个 PTypeFamily 的静态工厂方法,以便于导入和使用:一个用于 Writables,一个用于 Avros。

接着:

对于您的大多数管道,您将只使用一个类型系列,因此您可以通过将 Writables 或 Avros 类中的所有方法导入您的类来减少类中的一些样板文件

导入静态 org.apache.crunch.types.avro.Avros.*;

事实上,在官方 repo 中为 Crunch 提供的示例中,可以看出这是如何明确表达的。请参阅WordCount示例中的以下代码片段:

PCollection<String> lines = pipeline.readTextFile(args[0]);

PCollection<String> words = lines.parallelDo(new DoFn<String, String>() {
  public void process(String line, Emitter<String> emitter) {
    for (String word : line.split("\\s+")) {
      emitter.emit(word);
    }
  }
}, Writables.strings()); // Indicates the serialization format

PTable<String, Long> counts = words.count();

虽然等效的Scrunch 版本是这样的:

 def countWords(file: String) = {
read(from.textFile(file))
  .flatMap(_.split("\\W+").filter(!_.isEmpty()))
  .count

}

并且没有提供明确的或据我所见的隐式引用WritableFamily

那么 Scrunch 是如何决定使用什么 Writable family 类型的呢?它是基于原始输入源的默认值吗?(例如,如果从文本文件读取,它是可写的,如果从 Avro 然后是 Avro)如果是这种情况,那么我如何更改类型以从一个源读取并写入目标 taht 在 Scrunch 中属于不同的系列类型?

4

0 回答 0