3

我有一个接受一系列查询的方法,我需要针对不同的搜索引擎 Web API 运行它们,例如 Google 或 Yahoo 的。为了使该过程并行化,为每个查询生成一个线程,然后join在最后对其进行编辑,因为我的应用程序只能在获得每个查询的结果后才能继续。我目前有这些方面的东西:

public abstract class class Query extends Thread {
    private String query;

    public abstract Result[] querySearchEngine();
    @Override
    public void run() {
        Result[] results = querySearchEngine(query);
        Querier.addResults(results);
    }

}

public class GoogleQuery extends Query {
    public Result querySearchEngine(String query) { 
        // access google rest API
    }
}

public class Querier {
    /* Every class that implements Query fills this array */
    private static ArrayList<Result> aggregatedResults;

    public static void addResults(Result[]) { // add to aggregatedResults }

    public static Result[] queryAll(Query[] queries) {
        /* for each thread, start it, to aggregate results */
        for (Query query : queries) {
            query.start();
        }
        for (Query query : queries) {
            query.join();
        }
        return aggregatedResults;
    }
}

最近,我发现 Java 中有一个的API 用于执行并发作业。即Callable接口FutureTaskExecutorService。我想知道这个新 API 是否应该使用,它们是否比传统 API 更有效,Runnable以及Thread.

在研究了这个新的 API 之后,我想出了以下代码(简化版):

   public abstract class Query implements Callable<Result[]> {
        private final String query; // gets set in the constructor

        public abstract Result[] querySearchEngine();
        @Override
        public Result[] call() {
            return querySearchEngine(query);
        }
    }

public class Querier {   
        private ArrayList<Result> aggregatedResults;

        public Result[] queryAll(Query[] queries) {
            List<Future<Result[]>> futures = new ArrayList<Future<Result[]>>(queries.length);
            final ExecutorService service = Executors.newFixedThreadPool(queries.length);  
            for (Query query : queries) {
                futures.add(service.submit(query));  
            }
            for (Future<Result[]> future : futures) {  
                aggregatedResults.add(future.get());  // get() is somewhat similar to join?
            }  
            return aggregatedResults;
        }
    }

我是这个并发 API 的新手,我想知道上面的代码中是否有可以改进的地方,以及它是否比第一个选项(使用Thread)更好。有些课程我没有探索,例如FutureTask,等等。我也很想听听这方面的任何建议。

4

3 回答 3

7

您的代码有几个问题。

  1. 您可能应该使用 ExecutorService.invokeAll() 方法。创建新线程和新线程池的成本可能很高(尽管可能无法与调用外部搜索引擎相比)。invokeAll() 可以为您管理线程。
  2. 您可能不想混合使用数组和泛型。
  3. 您正在调用 aggregatedResults.add() 而不是 addAll()。
  4. 当成员变量可能是 queryAll() 函数调用的本地变量时,您不需要使用它们。

所以,类似下面的东西应该可以工作:

public abstract class Query implements Callable<List<Result>> {
    private final String query; // gets set in the constructor

    public abstract List<Result> querySearchEngine();
    @Override
    public List<Result> call() {
        return querySearchEngine(query);
    }
}

public class Querier {   
    private static final ExecutorService executor = Executors.newCachedThreadPool();

    public List<Result> queryAll(List<Query> queries) {
        List<Future<List<Result>>> futures = executor.submitAll(queries);
        List<Result> aggregatedResults = new ArrayList<Result>();
        for (Future<List<Result>> future : futures) {  
            aggregatedResults.addAll(future.get());  // get() is somewhat similar to join?
        }  
        return aggregatedResults;
    }
}
于 2009-07-19T15:17:38.463 回答
4

作为进一步的改进,您可以考虑使用CompletionService 它解耦提交和检索的顺序,而不是将所有未来的结果放在一个队列中,您可以按照完成的顺序从中获取结果。

于 2009-07-19T15:10:58.663 回答
3

我可以建议您使用Future.get() 超时吗?

否则,只需要一个搜索引擎没有响应就可以让一切停止(如果你最终遇到网络问题,它甚至不需要是搜索引擎问题)

于 2009-07-19T16:23:38.533 回答