18

我正在寻找一个使用新 API 读取和写入序列文件的示例。

实际上我需要知道如何使用这些功能

 createWriter(Configuration conf, org.apache.hadoop.io.SequenceFile.Writer.Option... opts) 

旧定义对我不起作用:

SequenceFile.createWriter( fs, conf, path, key.getClass(), value.getClass());

同样,我需要知道读取序列文件的代码是什么,因为不推荐使用以下代码:

SequenceFile.Reader(fs, path, conf);

这是使用相同的方法 -

    String uri = args[0];
    Configuration conf = new Configuration();
    Path path = new Path( uri);

    IntWritable key = new IntWritable();
    Text value = new Text();

    CompressionCodec Codec = new GzipCodec();
    SequenceFile.Writer writer = null;
    Option optPath = SequenceFile.Writer.file(path);
    Option optKey = SequenceFile.Writer.keyClass(key.getClass());
    Option optVal = SequenceFile.Writer.valueClass(value.getClass());
    Option optCom = SequenceFile.Writer.compression(CompressionType.RECORD,  Codec);

        writer = SequenceFile.createWriter( conf, optPath, optKey, optVal, optCom);
4

4 回答 4

16
public class SequenceFilesTest {
  @Test
  public void testSeqFileReadWrite() throws IOException {
    Configuration conf = new Configuration();
    FileSystem fs = FileSystem.getLocal(conf);
    Path seqFilePath = new Path("file.seq");
    SequenceFile.Writer writer = SequenceFile.createWriter(conf,
            Writer.file(seqFilePath), Writer.keyClass(Text.class),
            Writer.valueClass(IntWritable.class));

    writer.append(new Text("key1"), new IntWritable(1));
    writer.append(new Text("key2"), new IntWritable(2));

    writer.close();

    SequenceFile.Reader reader = new SequenceFile.Reader(conf,
            Reader.file(seqFilePath));

    Text key = new Text();
    IntWritable val = new IntWritable();

    while (reader.next(key, val)) {
        System.err.println(key + "\t" + val);
    }

    reader.close();
  }
}
于 2013-04-17T23:46:55.320 回答
8

我迟到了一年多才回答,但刚刚开始使用 Hadoop 2.4.1 :)

下面是代码,有人可能会觉得它有用。

注意:它包括用于读取和写入序列文件的注释 1.x 代码。我想知道它在哪里获取文件系统但是当我直接在集群上执行它时,它正确地选择了它(可能是从配置中提到的 core-site.xml

import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Reader.Option;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;

public class SequenceFileOperator {

    private Configuration conf = new Configuration();
    /*private FileSystem fs;
    {
        try {
            fs = FileSystem.get(URI.create("hdfs://cldx-1336-1202:9000"), conf);
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }*/

    public static void main(String[] args) throws IOException {
        // TODO Auto-generated method stub

        if (args == null || args.length < 2) {

            System.out
                    .println("Following are the possible invocations <operation id> <arg1> <arg2> ...");

            System.out
                    .println("1 <absolute path of directory containing documents> <HDFS path of the sequence file");

            System.out.println("2 <HDFS path of the sequence file>");
            return;
        }

        int operation = Integer.valueOf(args[0]);

        SequenceFileOperator docToSeqFileWriter = new SequenceFileOperator();

        switch (operation) {

        case 1: {
            String docDirectoryPath = args[1];
            String sequenceFilePath = args[2];

            System.out.println("Writing files present at " + docDirectoryPath
                    + " to the sequence file " + sequenceFilePath);

            docToSeqFileWriter.loadDocumentsToSequenceFile(docDirectoryPath,
                    sequenceFilePath);

            break;
        }

        case 2: {

            String sequenceFilePath = args[1];

            System.out.println("Reading the sequence file " + sequenceFilePath);

            docToSeqFileWriter.readSequenceFile(sequenceFilePath);

            break;
        }

        }

    }

    private void readSequenceFile(String sequenceFilePath) throws IOException {
        // TODO Auto-generated method stub

        /*
         * SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(fs,
         * new Path(sequenceFilePath), conf);
         */
        Option filePath = SequenceFile.Reader.file(new Path(sequenceFilePath));
        SequenceFile.Reader sequenceFileReader = new SequenceFile.Reader(conf,
                filePath);

        Writable key = (Writable) ReflectionUtils.newInstance(
                sequenceFileReader.getKeyClass(), conf);
        Writable value = (Writable) ReflectionUtils.newInstance(
                sequenceFileReader.getValueClass(), conf);

        try {

            while (sequenceFileReader.next(key, value)) {

                System.out
                        .printf("[%s] %s %s \n",
                                sequenceFileReader.getPosition(), key,
                                value.getClass());
            }
        } finally {
            IOUtils.closeStream(sequenceFileReader);
        }

    }

    private void loadDocumentsToSequenceFile(String docDirectoryPath,
            String sequenceFilePath) throws IOException {
        // TODO Auto-generated method stub

        File docDirectory = new File(docDirectoryPath);

        if (!docDirectory.isDirectory()) {
            System.out
                    .println("Please provide an absolute path of a directory that contains the documents to be added to the sequence file");
            return;
        }

        /*
         * SequenceFile.Writer sequenceFileWriter =
         * SequenceFile.createWriter(fs, conf, new Path(sequenceFilePath),
         * Text.class, BytesWritable.class);
         */
        org.apache.hadoop.io.SequenceFile.Writer.Option filePath = SequenceFile.Writer
                .file(new Path(sequenceFilePath));
        org.apache.hadoop.io.SequenceFile.Writer.Option keyClass = SequenceFile.Writer
                .keyClass(Text.class);
        org.apache.hadoop.io.SequenceFile.Writer.Option valueClass = SequenceFile.Writer
                .valueClass(BytesWritable.class);

        SequenceFile.Writer sequenceFileWriter = SequenceFile.createWriter(
                conf, filePath, keyClass, valueClass);

        File[] documents = docDirectory.listFiles();

        try {
            for (File document : documents) {

                RandomAccessFile raf = new RandomAccessFile(document, "r");
                byte[] content = new byte[(int) raf.length()];

                raf.readFully(content);

                sequenceFileWriter.append(new Text(document.getName()),
                        new BytesWritable(content));

                raf.close();
            }
        } finally {
            IOUtils.closeStream(sequenceFileWriter);
        }

    }
}
于 2014-08-25T11:09:59.840 回答
0

阅读你可以使用

Path path= new Path("/bar");
Reader sequenceFileReader = new SequenceFile.Reader(conf,SequenceFile.Reader.file(path));
于 2015-04-08T05:19:53.937 回答
-4

您需要设置SequenceFile为输入格式

job.setInputFormatClass(SequenceFileInputFormat.class);

您将在此处找到从 HDFS 中读取 SeequnceFile 的示例。

于 2013-12-14T08:49:26.077 回答