0

我尝试在返回完成线程的 Java 打印结果上制作多线程程序。问题是,当我运行此代码时,它只会卡在队列中的第二个值上:

        System.out.println("[!] Creaing pool");
        int max_threads     = 50;
        ExecutorService threadPool  = Executors.newFixedThreadPool(max_threads);
        CompletionService<String> taskCompletionService =
        new ExecutorCompletionService<String>(threadPool);
        String url;

        while(our_file.hasNext()){

            url = our_file.next();
            if (url.length()>0){

                futures.add(
                    taskCompletionService.submit(
                    new GoGo(url)
                    )
                    );
                    }



            int total_tasks = futures.size();

            while(total_tasks>0){
            for (int i=0; i<futures.size(); i++){

                try{    

                    Future result = taskCompletionService.poll();
                    if(result!=null && result.isDone()){
                        System.out.println(result.get());
                        total_tasks--;

                    }
                }
                catch (InterruptedException e) {
                    // Something went wrong with a task submitted
                    System.out.println("Error Interrupted exception");
                    e.printStackTrace();
                } catch (ExecutionException e) {
                    // Something went wrong with the result
                    e.printStackTrace();
                    System.out.println("Error get() threw exception");
                }

                }


            }

                 }


        threadPool.shutdown();
        try {
        threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
        }
        catch (InterruptedException e ) {
        }



...



     class GoGo implements Callable{

        private String url;

        public GoGo(String received_url){
            this.url = received_url;
        }

        public String call(){

            String URL = this.url;
            return url;

        }
    }

输出是这样的:

[!] Creaing pool
http://www.www1.com/
http://www.www2.ch/

而此时程序只是卡住了。我试图将迭代期货数组的循环移出提交线程的主循环,它工作得很好,但如果我要通过非常大的文件,我需要实时输出。请帮我找出瓶颈在哪里,我找不到任何合适的代码,使用来自 CompletionService 的非阻塞 poll() 方法。感谢您的任何回答或参考。

4

2 回答 2

1

问题是您试图在一个线程中同时做两件事(提交工作和阅读工作结果)。

这没有意义——对于同时执行的任务,您需要多个线程。

所以创建另一个线程来读取结果。或另一个线程来提交任务。你用哪种方法都没有关系。无论哪种方式,您最终都会得到 2 个线程而不是 1 个。

于 2013-10-27T15:31:51.537 回答
0

感谢 Robin Green 的建议,将未来的收割机类放到单独的线程中解决了这个问题!因此,我只需使用 poll() 启动 pop 参数的无限循环线程,检查弹出的未来对象是否指示线程 isDone() 并写入输出。关闭 fixedThreadPool 后,输出写入器类停止。这是代码(GoGo 类除外):

    public class headScanner {


        public static List<Future<String>> gloabal_futures   = new ArrayList<Future<String>>();


        public static void main(String args[]){

            Scanner our_file                 = null;
            ArrayList<String> our_urls       = new ArrayList<String>();
            List<Future<String>> futures     = new ArrayList<Future<String>>();
            ArrayList<String> urls_buffer    = new ArrayList<String>();


            try {
            our_file = new Scanner (new File ("list.txt"));
            }
            catch(IOException e){
                System.out.println("[-] Cant open the file!");
                System.exit(0);
            }

            System.out.println("[!] Creaing pool");
            int max_threads     = 50;
            ExecutorService threadPool  = Executors.newFixedThreadPool(max_threads);
            CompletionService<String> taskCompletionService =
            new ExecutorCompletionService<String>(threadPool);
            String url;



            Thread result_thread = new Thread(new ResultHarvester(futures.size(), taskCompletionService));
            result_thread.start();  

            while(our_file.hasNext()){

                url = our_file.next();
                if (url.length()>0){

                    futures.add(
                        taskCompletionService.submit(
                        new GoGo(url)
                        )
                        );
                        }

                }



            threadPool.shutdown();
            try {
            threadPool.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
            }
            catch (InterruptedException e ) {
            }

            result_thread.stop();

        }
    }


 class ResultHarvester implements Runnable {

     private int size;
     private CompletionService<String> all_service;

     public ResultHarvester (int size, CompletionService<String> service){
        this.size = size;
        this.all_service = service;
     }

     public void run(){
            int future_size = 1;
            CompletionService<String> this_service = this.all_service; 
                while(true){
                    Future result = this_service.poll();
                    try {
                            if(result!=null && result.isDone()){
                                String output = result.get().toString();
                                if(output.length()>1){
                                    System.out.println(output);
                                }

                            }
                        }

                    catch (InterruptedException e) {
                        // Something went wrong with a task submitted
                        System.out.println("Error Interrupted exception");
                        e.printStackTrace();
                    } catch (ExecutionException e) {
                        // Something went wrong with the result
                        e.printStackTrace();
                        System.out.println("Error get() threw exception");
                    }


                    }
                }

        }
于 2013-10-27T17:40:04.330 回答