0

我正在运行一个有 50 台机器的 spark 集群。每台机器都是一个具有 8 核和 50GB 内存的 VM(Spark 似乎可以使用 41 个)。

我在几个输入文件夹上运行,我估计输入的大小约为 250GB gz 压缩。

虽然在我看来,我使用的机器数量和配置似乎足够了,但在运行约 40 分钟后作业失败,我可以在日志中看到以下错误:

2558733 [Result resolver thread-2] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 345.0 in stage 1.0 (TID 345, hadoop-w-3.c.taboola-qa-01.internal): java.lang.OutOfMemoryError: Java heap space

java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
java.lang.StringCoding.decode(StringCoding.java:193)
java.lang.String.<init>(String.java:416)
java.lang.String.<init>(String.java:481)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:699)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:660)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

并且:

2653545 [Result resolver thread-2] WARN org.apache.spark.scheduler.TaskSetManager  - Lost task 122.1 in stage 1.0 (TID 392, hadoop-w-22.c.taboola-qa-01.internal): java.lang.OutOfMemoryError: GC overhead limit exceeded

java.lang.StringCoding$StringDecoder.decode(StringCoding.java:149)
java.lang.StringCoding.decode(StringCoding.java:193)
java.lang.String.<init>(String.java:416)
java.lang.String.<init>(String.java:481)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:699)
com.doit.customer.dataconverter.Phase0$3.call(Phase0.java:660)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$7$1.apply(JavaRDDLike.scala:164)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:596)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
org.apache.spark.scheduler.Task.run(Task.scala:54)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
java.lang.Thread.run(Thread.java:745)

我该如何调试这样的问题?

编辑:我找到了问题的根本原因。就是这段代码:

    private static final int MAX_FILE_SIZE = 40194304;
    ....
    ....
        JavaPairRDD<String, List<String>> typedData = filePaths.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, List<String>>() {
            @Override
            public Iterable<Tuple2<String, List<String>>> call(Iterator<String> filesIterator) throws Exception {
                List<Tuple2<String, List<String>>> res = new ArrayList<>();
                String fileType = null;
                List<String> linesList = null;
                if (filesIterator != null) {
                    while (filesIterator.hasNext()) {
                        try {
                            Path file = new Path(filesIterator.next());
                            // filter non-trc files
                            if (!file.getName().startsWith("1")) {
                                continue;
                            }
                            fileType = getType(file.getName());
                            Configuration conf = new Configuration();
                            CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
                            CompressionCodec codec = compressionCodecs.getCodec(file);
                            FileSystem fs = file.getFileSystem(conf);
                            ContentSummary contentSummary = fs.getContentSummary(file);
                            long fileSize = contentSummary.getLength();
                            InputStream in = fs.open(file);
                            if (codec != null) {
                                in = codec.createInputStream(in);
                            } else {
                                throw new IOException();
                            }

                            byte[] buffer = new byte[MAX_FILE_SIZE];

                            BufferedInputStream bis = new BufferedInputStream(in, BUFFER_SIZE);
                            int count = 0;
                            int bytesRead = 0;
                            try {
                                while ((bytesRead = bis.read(buffer, count, BUFFER_SIZE)) != -1) {
                                    count += bytesRead;
                                }
                            } catch (Exception e) {
                                log.error("Error reading file: " + file.getName() + ", trying to read " + BUFFER_SIZE + " bytes at offset: " + count);
                                throw e;
                            }

                            Iterable<String> lines = Splitter.on("\n").split(new String(buffer, "UTF-8").trim());
                            linesList = Lists.newArrayList(lines);

                            // get rid of first line in file

                            Iterator<String> it = linesList.iterator();
                            if (it.hasNext()) {
                                it.next();
                                it.remove();
                            }
                            //res.add(new Tuple2<>(fileType,linesList));
                        } finally {
                            res.add(new Tuple2<>(fileType, linesList));
                        }


                    }

                }
                return res;
            }

特别是为每个文件分配一个大小为 40M 的缓冲区,以便使用 BufferedInputStream 读取文件的内容。这会导致堆栈内存在某个点结束。

事情是:

  • 如果我逐行读取(不需要缓冲区),那将是非常低效的读取
  • 如果我分配一个缓冲区并为每个文件读取重用它 - 是否有可能在并行意义上?还是会被多个线程覆盖?

欢迎任何建议...

编辑 2:通过将字节数组分配移到迭代器之外来修复第一个内存问题,因此它被所有分区元素重用。但是仍然有 new String(buffer, "UTF-8").trim()) 为拆分目的而创建的 - 这是一个每次都会创建的对象。我可以使用字符串缓冲区/构建器,但是如何在没有字符串对象的情况下设置字符集编码?

4

1 回答 1

1

最终我将代码更改如下:

       // Transform list of files to list of all files' content in lines grouped by type
        JavaPairRDD<String,List<String>> typedData = filePaths.mapToPair(new PairFunction<String, String, List<String>>() {
            @Override
            public Tuple2<String, List<String>> call(String filePath) throws Exception {
                Tuple2<String, List<String>> tuple = null;
                try {
                    String fileType = null;
                    List<String> linesList = new ArrayList<String>();
                    Configuration conf = new Configuration();
                    CompressionCodecFactory compressionCodecs = new CompressionCodecFactory(conf);
                    Path path = new Path(filePath);
                    fileType = getType(path.getName());
                    tuple = new Tuple2<String, List<String>>(fileType, linesList);

                    // filter non-trc files
                    if (!path.getName().startsWith("1")) {
                        return tuple;
                    }

                    CompressionCodec codec = compressionCodecs.getCodec(path);
                    FileSystem fs = path.getFileSystem(conf);
                    InputStream in = fs.open(path);
                    if (codec != null) {
                        in = codec.createInputStream(in);
                    } else {
                        throw new IOException();
                    }

                    BufferedReader r = new BufferedReader(new InputStreamReader(in, "UTF-8"), BUFFER_SIZE);
                    // Get rid of the first line in the file
                    r.readLine();

                    // Read all lines
                    String line;
                    while ((line = r.readLine()) != null) {
                        linesList.add(line);
                    }
                } catch (IOException e) { // Filtering of files whose reading went wrong
                    log.error("Reading of the file " + filePath + " went wrong: " + e.getMessage());
                } finally {
                    return tuple;
                }
            }

        });

所以现在我不使用大小为 40M 的缓冲区,而是使用数组列表动态构建行列表。这解决了我当前的内存问题,但现在我遇到了其他奇怪的错误,导致工作失败。将在不同的问题中报告那些...

于 2014-10-24T22:44:59.657 回答