4

我正在编写一个需要处理大量 URL 的 java 程序。
每个 URL 将按顺序运行以下作业:下载、分析、压缩

我希望每个作业都有固定数量的线程,而不是让一个线程在每个 URL 上一次完成所有作业,以便所有作业在任何给定时间都有线程同时运行。

例如,下载作业将有多个线程来获取和下载 URL,一旦下载了其中一个 URL,它就会将其传递给分析作业中的线程,一旦完成,它就会传递给压缩作业中的线程等。

我正在考虑在 java 中使用 CompletionService,因为它会在完成后立即返回结果,但我不确定它是如何工作的,到目前为止我的代码如下所示:

ExecutorService executor = Executors.newFixedThreadPool(3);
CompletionService<DownloadedItem> completionService = new ExecutorCompletionService<DownloadedItem>(executor);

//while list has URL do {
   executor.submit(new DownloadJob(list.getNextURL());//submit to queue for download
//}

//while there is URL left do {
   Future<DownloadedItem> downloadedItem = executor.take();//take the result as soon as it finish
   //what to do here??
//}

我的问题是如何将下载的项目移动到分析作业并在那里完成工作而无需等待所有下载作业完成?我正在考虑为每个工作创建一个 CompletionService,这是一种可行的方法吗?如果没有,是否有更好的替代方法来解决这个问题?请提供例子。

4

3 回答 3

3

一旦你提到IN ORDER任何尝试为那些按顺序任务使用单独的线程只会使你的系统设计复杂化。

在我看来,最好的办法是让单独的线程同时处理单个 URL。要执行这 3 个步骤,您可以引入另一个抽象(例如使用 3 个可调用对象),但您仍然希望在一个线程中按顺序执行它们。并且不需要完成服务。

于 2012-09-24T21:00:56.923 回答
1

你很接近。首先将您的任务提交给CompletionService

completionService.submit(new DownloadJob(list.getNextURL());

现在抓住Future并等待它:

DownloadedItem> downloadedItem = executor.take().get();

调用get()可能会阻塞。重复上述行的次数与您提交的项目数一样多。


如果您需要更大、更大的吞吐量(在您的情况下,一次最多下载三个 URL),请考虑async-http-client允许您同时从数以千计的 URL 下载。它使用 NIO 并且是事件驱动的,不涉及线程。

于 2012-09-24T20:59:53.380 回答
1

您所描述的称为Pipeline。基本上下载任务的输出是分析任务的输入。分析的输出是压缩的输入。似乎有两种选择可以做到这一点:

1)让下载任务知道输出的管道,以便它可以自己提交结果。

class DownloadTask implement Runnable {
    Executor analyzePipeline;
    public void run() {
        //Do download stuff
        analyzePipeline.submit(new AnalyzeTask(downloaded content));
    }
}

2) 允许另一个线程将下载任务的结果移动到分析任务的管道中。

ExecutorService executor = Executors.newFixedThreadPool(3);
ExecutorService analyzeExecutor = Executors.newFixedThreadPool(3);
CompletionService<DownloadedItem> completionService = new ExecutorCompletionService<DownloadedItem>(executor);

while list has URL do {
   executor.submit(new DownloadJob(list.getNextURL());//submit to queue for download
}

new Thread() {
    public void run() {
        while there is URL left do {
            Future<DownloadedItem> downloadedItem = executor.take();//take the result as soon as it finish
            analyzeExecutor.submit(new AnalyzeJob(downloadedItem.get());
        }
    }
};    
//...and so on
于 2012-09-24T21:22:04.733 回答