I have defined an AVRO schema, and generated some classes with avro-tools for the schemes. Now, I want to serialize the data to disk. I found some answers about scala for this, but not for Java. The class Article is generated with avro-tools, and is made from a schema defined by me.

Here's a simplified version of the code of how I try to do it:

JavaPairRDD<String, String> filesRDD = context.wholeTextFiles(inputDataPath);
JavaRDD<Article> processingFiles = filesRDD.map(fileNameContent -> {
    // The name of the file
    String fileName = fileNameContent._1();
    // The content of the file
    String fileContent = fileNameContent._2();

    // An object from my avro schema
    Article a = new Article(fileContent);

    Processing processing = new Processing();
    // .... some processing of the content here ... //


    return a;

where serializeArticleToDisk(avroFileName) is defined as follows:

public void serializeArticleToDisk(String filename) throws IOException{
    // Serialize article to disk
    DatumWriter<Article> articleDatumWriter = new SpecificDatumWriter<Article>(Article.class);
    DataFileWriter<Article> dataFileWriter = new DataFileWriter<Article>(articleDatumWriter);
    dataFileWriter.create(this.article.getSchema(), new File(filename));

where Article is my avro schema.

Now, the mapper throws me the error:

java.io.FileNotFoundException: hdfs:/...path.../avroFileName.avro (No such file or directory)   
at java.io.FileOutputStream.open0(Native Method)    
at java.io.FileOutputStream.open(FileOutputStream.java:270)     
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)   
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)   
at org.apache.avro.file.SyncableFileOutputStream.<init>(SyncableFileOutputStream.java:60)   
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at org.apache.avro.file.DataFileWriter.create(DataFileWriter.java:129)
at sentences.ProcessXML.serializeArticleToDisk(ProcessXML.java:207)     
. . . rest of the stacktrace ... 

although the file path is correct.

I use a collect() method afterwards, so everything else within the map function works fine (except for the serialization part).

I am quite new with Spark, so I am not sure if this might be something trivial actually. I suspect that I need to use some writing functions, not to do the writing in the mapper (not sure if this is true, though). Any ideas how to tackle this?


The last line of the error stack-trace I showed, is actually on this part:

dataFileWriter.create(this.article.getSchema(), new File(filename));

This is the part that throws the actual error. I am assuming the dataFileWriter needs to be replaced with something else. Any ideas?


import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.io.NullWritable;
import org.apache.avro.mapred.AvroKey;
import org.apache.spark.api.java.JavaPairRDD;
import scala.Tuple2;

   .  .  .  .  .

// Serializing to AVRO
JavaPairRDD<AvroKey<Article>, NullWritable> javaPairRDD = processingFiles.mapToPair(r -> {    
    return new Tuple2<AvroKey<Article>, NullWritable>(new AvroKey<Article>(r), NullWritable.get());
Job job = AvroUtils.getJobOutputKeyAvroSchema(Article.getClassSchema());
javaPairRDD.saveAsNewAPIHadoopFile(outputDataPath, AvroKey.class, NullWritable.class, AvroKeyOutputFormat.class, 


public static Job getJobOutputKeyAvroSchema(Schema avroSchema) {
    Job job;

    try {
        job = new Job();
    } catch (IOException e) {
        throw new RuntimeException(e);

    AvroJob.setOutputKeySchema(job, avroSchema);
    return job;

可以在这里找到 Spark + Avro 的类似内容 -> https://github.com/CeON/spark-utils

您似乎以错误的方式使用 Spark。

Map是一个变换函数。只是调用map不会调用RDD. 您必须调用类似or的操作forEach()collect()

另请注意,提供给的 lambdamap将在驱动程序处序列化并转移到Node集群中的某些位置。


尝试使用 Spark SQL 和Spark-AvroDataFrame以Avro 格式保存 Spark :

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("/examples/people.txt")

// Apply a schema to an RDD
DataFrame peopleDF = sqlContext.createDataFrame(people, Person.class);
