1

我构建了一个基本的 Web 解析器,它使用 hadoop 将 url 传递给多个线程。这工作得很好,直到我到达我的输入文件的末尾,Hadoop 声明自己已经完成,同时仍有线程在运行。这会导致错误 org.apache.hadoop.fs.FSError: java.io.IOException: Stream Closed。有没有办法让流保持足够长的时间让线程完成?(我可以以合理的准确度预测线程将在单个 url 上花费的最长时间)。

这是我执行线程的方式

public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, Text> {
        private Text word = new Text();
        private URLPile pile = new URLPile();
        private MSLiteThread[] Threads = new MSLiteThread[16];
        private boolean once = true;

        @Override
        public void map(LongWritable key, Text value,
                OutputCollector<Text, Text> output, Reporter reporter) {

            String url = value.toString();
            StringTokenizer urls = new StringTokenizer(url);
            Config.LoggerProvider = LoggerProvider.DISABLED;
             System.out.println("In Mapper");
            if (once) {
                for (MSLiteThread thread : Threads) {
                    System.out.println("created thread");
                    thread = new MSLiteThread(pile);
                    thread.start();
                }
                once = false;
            }

            while (urls.hasMoreTokens()) {
                try {
                    word.set(urls.nextToken());
                    String currenturl = word.toString();
                    pile.addUrl(currenturl, output);

                } catch (Exception e) {
                    e.printStackTrace();
                    continue;
                }

            }

        }

线程本身得到这样的网址

    public void run(){
            try {
            sleep(3000);
                while(!done()){
                    try {
                    System.out.println("in thread");
                      MSLiteURL tempURL = pile.getNextURL();
                      String currenturl = tempURL.getURL();
                      urlParser.parse(currenturl);
                      urlText.set("");
                      titleText.set(currenturl+urlParser.export());
                      System.out.println(urlText.toString()+titleText.toString());
                      tempURL.getOutput().collect(urlText, titleText);
                      pile.doneParsing();
                     sleep(30);
                    } catch (Exception e) {
                          pile.doneParsing();
                    e.printStackTrace();
                        continue;
                    }
                }
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            System.out.println("Thread done");

        }

而urlpile中的相关方法是

public synchronized void addUrl(String url,OutputCollector<Text, Text> output) throws InterruptedException {
        while(queue.size()>16){
            System.out.println("queue full");
            wait();
        }
        finishedParcing--;
        queue.add(new MSLiteURL(output,url));
        notifyAll();
    }

    private Queue<MSLiteURL> queue = new LinkedList<MSLiteURL>();
    private int sent = 0;
    private int finishedParcing = 0;
    public synchronized MSLiteURL getNextURL() throws InterruptedException {

        notifyAll();
        sent++;
        //System.out.println(queue.peek());
        return queue.remove();

    }
4

1 回答 1

1

正如我可以从下面的评论中推断的那样,您可能可以在每个 map() 函数中执行此操作以使事情变得简单。我看到您执行以下操作,以预先创建一些空闲线程。您可以将以下代码移至

if (once) {
  for (MSLiteThread thread : Threads) {
     System.out.println("created thread");
     thread = new MSLiteThread(pile);
     thread.start();
  }
once = false;
}

至,

public static class Map extends MapReduceBase implements
            Mapper<LongWritable, Text, Text, Text> {
    @Override
    public void configure(JobConf job) {
       for (MSLiteThread thread : Threads) {
         System.out.println("created thread");
         thread = new MSLiteThread(pile);
         thread.start();
       }
    }

    @Override
    public void map(LongWritable key, Text value,
       OutputCollector<Text, Text> output, Reporter reporter) {
    }

}

因此,这可以被初始化一次,就此而言,不再需要“一次”条件检查。

此外,您不需要像上面那样制作空闲线程。我不知道这样创建 16 个空闲线程会获得多少性能提升。

无论如何,这是一个解决方案(虽然可能并不完美)

您可以使用类似 countdownlatch 的东西在这里阅读更多内容,以 N 或更多批次处理您的 url,并阻止它们直到它们完成。这是因为,如果您将每个传入的 url 记录释放到一个线程,则将立即获取下一个 url,并且当您以相同方式处理最后一个 url 时,即使您有剩余线程,map() 函数也会返回在队列中处理。您将不可避免地遇到您提到的异常。

这里有一个例子,说明你有多大可能使用倒计时闩锁。

 public static class Map extends MapReduceBase implements
                Mapper<LongWritable, Text, Text, Text> {

            @Override
            public void map(LongWritable key, Text value,
                OutputCollector<Text, Text> output, Reporter reporter) {

                String url = value.toString();
                StringTokenizer urls = new StringTokenizer(url);
                Config.LoggerProvider = LoggerProvider.DISABLED;

            //setting countdownlatch to urls.countTokens() to block off that many threads.
            final CountDownLatch latch = new CountDownLatch(urls.countTokens());
            while (urls.hasMoreTokens()) {
                try {
                    word.set(urls.nextToken());
                    String currenturl = word.toString();
                    //create thread and fire for current URL here
                    thread = new URLProcessingThread(currentURL, latch);
                    thread.start();
                } catch (Exception e) {
                    e.printStackTrace();
                    continue;
                }

            }

          latch.await();//wait for 16 threads to complete execution
          //sleep here for sometime if you wish

        }

    }

最后,在 URLProcessingThread 中,一旦处理了 URL,就会减少闩锁计数器,

public class URLProcessingThread implments Runnable {
    CountDownLatch latch;
    URL url;
    public  URLProcessingThread(URL url,  CountDownLatch latch){
       this.latch = latch;
       this.url = url;
    }
    void run() {
         //process url here
         //after everything finishes decrement the latch
         latch.countDown();//reduce count of CountDownLatch by 1

    }
}

您的代码可能出现问题:pile.addUrl(currenturl, output);,当您添加一个新的 url 时,同时所有 16 个线程都会获得更新(我不太确定),因为同一个对象被传递给 16 个线程。您的网址有可能被重新处理,或者您可能会得到一些其他副作用(我对此不太确定)。

其他建议:

此外,您可能希望使用增加地图任务超时

mapred.task.timeout

(默认 = 600000 毫秒)= 10 分钟

描述:如果任务既不读取输入,也不写入输出,也不更新其状态字符串,任务将终止前的毫秒数。

您可以在 mapred-site.xml 中添加/覆盖此属性

于 2013-07-19T17:25:06.497 回答