1

所以,我对多线程还很陌生,并且最近在我的所有程序中都使用了这个想法。在我开始更多地使用它之前,我真的想确保它是使用 Executor、CompletionService 和 BlockingQueue 以及 Observer 实现多线程的正确有效方法。我将在下面提供示例代码,但首先让我快速解释一下我认为它是如何工作的,也许这会有所帮助。

我拥有的第一件事是 BlockingQueue,所有任务都通过 add(Task task) 方法添加到此队列中。在创建类时,调用 run 方法并调用 while(true) 调用队列阻塞,直到将某些内容添加到任务队列中。

一旦某些东西被添加到 run() queue.take() 内的队列中,就会返回队列中的项目。然后我拿那个项目并将它传递给 WorkerThread 类,该类在它上面做一些事情。该 workerThread 被添加到处理等待线程完成的 CompletionService 池中。

好的,现在是我不确定是否正确的部分。我还有一个实现可运行的内部类,并在类初始化时启动。它的工作是永远循环调用 pool.take()。因此,这实质上是等待其中一个 WorkerThread 完成。我让完成服务处理这个。一旦 take() 得到一个值,内部类将它传递给一个通知观察者方法。

这可以实现吗?让我有点担心的是,主类在任务队列上运行了一个 while(true) 循环,而一个内部类也在循环等待池以接收来自 WorkerThread 的结果?

这是一个示例实现。你认为呢?

     public class HttpSchedulerThreaded extends Observable implements Runnable  {

private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
{
    numThreadsInPool = numThreads;
    executor = Executors.newFixedThreadPool(numThreads);
    doneSignal = new CountDownLatch(numThreads);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    queue = new LinkedBlockingQueue<VulnInfo>();
}

public HttpSchedulerThreaded()
{
    numThreadsInPool = 1;
    executor = Executors.newFixedThreadPool(1);
    doneSignal = new CountDownLatch(1);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    schedulerWorker = new HttpSchedulerWorker();
    responseWorkerThread = new Thread(schedulerWorker);
    queue = new LinkedBlockingQueue<VulnInfo>();
}

public void setThreadCount(int numThreads)
{
    if(!isRunning){
    executor = Executors.newFixedThreadPool(numThreads);
    doneSignal = new CountDownLatch(numThreads);
    pool = new ExecutorCompletionService<VulnInfo>(executor);
    numThreadsInPool = numThreads;
    }
}


public void start()
{
    if(!isRunning){
        responseWorkerThread.start();
        new Thread(this).start();
        isRunning = true;
    }

}


public void add(VulnInfo info) {
    queue.add(info);
}

@Override
public void run() {
    // TODO Auto-generated method stub
    while(shouldRun)
    {   
        try {
            VulnInfo info = queue.take();
            Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
            //System.out.println("submitting to pooler: " + info.getID());
            pool.submit(worker);
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }   
    }
}

/**
 * Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
 * are complete it will send them to server for completion.
 * @author Steve
 *
 */
class HttpSchedulerWorker  implements Runnable{

    public void run() {
        // TODO Auto-generated method stub
        while(true)
        {
            VulnInfo vulnInfo = null;
            try {
                //System.out.println("taking finished request");
                Future<VulnInfo>    tmp = pool.take();
            //  Future<VulnInfo> tmp = pool.poll();
                if(tmp != null)
                    vulnInfo = tmp.get();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            } catch (ExecutionException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }

            if(vulnInfo != null)
            {
                //System.out.println("updating all observers: "  + vulnInfo.getID());
                updateObservers(vulnInfo);
            }



        }
    }

}
4

1 回答 1

2

根据我的经验,您的解决方案似乎没问题。我有三个意见/建议:

  1. 一旦你创建了一个新的执行线程responseWorkerThread = new Thread(schedulerWorker)responseWorkerThread.start(),你就基本上打破了这两个循环。这部分看起来还可以。您似乎确实Executor正确使用了 s API,但看起来您可能需要更多代码来停止HttpScheduledWorker线程并关闭ExecutionCompletionService作为HttpSchedulerThreaded类的一部分。
  2. 我不确定您的使用queue是否真的有必要。ExecutionCompletionService已经使用 aBlockingQueue来管理提交给它的任务。
  3. 您的“问题”可能更适合测试版代码审查网站。
于 2011-09-02T19:09:14.493 回答