1

I have defined an AVRO schema, and generated some classes with avro-tools for the schemes. Now, I want to serialize the data to disk. I found some answers about scala for this, but not for Java. The class Article is generated with avro-tools, and is made from a schema defined by me.

Here's a simplified version of the code of how I try to do it:

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<Article> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String fileContent = fileNameContent._2();

    // An object from my avro schema
    Article a = new Article(fileContent);

    Processing processing = new Processing();
    // .... some processing of the content here ... //

    processing.serializeArticleToDisk(avroFileName);

    return a;
});

where serializeArticleToDisk(avroFileName) is defined as follows:

public void serializeArticleToDisk(String filename) throws IOException{
    // Serialize article to disk
    DatumWriter<Article> articleDatumWriter = new SpecificDatumWriter<Article>(Article.class);
    DataFileWriter<Article> dataFileWriter = new DataFileWriter<Article>(articleDatumWriter);
    dataFileWriter.create(this.article.getSchema(), new File(filename));
    dataFileWriter.append(this.article);
    dataFileWriter.close();
}

where Article is my avro schema.

Now, the mapper throws me the error:

java.io.FileNotFoundException: hdfs:/...path.../avroFileName.avro (No such file or directory)   
at java.io.FileOutputStream.open0(Native Method)    
at java.io.FileOutputStream.open(FileOutputStream.java:270)     
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)   
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)   
at org.apache.avro.file.SyncableFileOutputStream.<init>(SyncableFileOutputStream.java:60)   
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at sentences.ProcessXML.serializeArticleToDisk(ProcessXML.java:207)     
. . . rest of the stacktrace ... 

although the file path is correct.

I use a collect() method afterwards, so everything else within the map function works fine (except for the serialization part).

I am quite new with Spark, so I am not sure if this might be something trivial actually. I suspect that I need to use some writing functions, not to do the writing in the mapper (not sure if this is true, though). Any ideas how to tackle this?

EDIT:

The last line of the error stack-trace I showed, is actually on this part:

dataFileWriter.create(this.article.getSchema(), new File(filename));

This is the part that throws the actual error. I am assuming the dataFileWriter needs to be replaced with something else. Any ideas?

4

2 回答 2

1

此解决方案不使用数据帧,也不会引发任何错误:

import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.NullWritable;
import org.apache.avro.mapred.AvroKey;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Tuple2;

   .  .  .  .  .

// Serializing to AVRO
JavaPairRDD<AvroKey<Article>, NullWritable> javaPairRDD = processingFiles.mapToPair(r -> {    
    return new Tuple2<AvroKey<Article>, NullWritable>(new AvroKey<Article>(r), NullWritable.get());
});
Job job = AvroUtils.getJobOutputKeyAvroSchema(Article.getClassSchema());
javaPairRDD.saveAsNewAPIHadoopFile(outputDataPath, AvroKey.class, NullWritable.class, AvroKeyOutputFormat.class, 
        job.getConfiguration());

哪里AvroUtils.getJobOutputKeyAvroSchema是:

public static Job getJobOutputKeyAvroSchema(Schema avroSchema) {
    Job job;

    try {
        job = new Job();
    } catch (IOException e) {
        throw new RuntimeException(e);
    }

    AvroJob.setOutputKeySchema(job, avroSchema);
    return job;
}

可以在这里找到 Spark + Avro 的类似内容 -> https://github.com/CeON/spark-utils

于 2016-04-13T18:07:15.173 回答
0

您似乎以错误的方式使用 Spark。

Map是一个变换函数。只是调用map不会调用RDD. 您必须调用类似or的操作forEach()collect()

另请注意,提供给的 lambdamap将在驱动程序处序列化并转移到Node集群中的某些位置。

添加

尝试使用 Spark SQL 和Spark-AvroDataFrame以Avro 格式保存 Spark :

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("/examples/people.txt")
    .map(Person::parse);

// Apply a schema to an RDD
DataFrame peopleDF = sqlContext.createDataFrame(people, Person.class);
peopleDF.write()
    .format("com.databricks.spark.avro")
    .save("/output");
于 2016-04-11T11:27:07.897 回答