所以,我对多线程还很陌生,并且最近在我的所有程序中都使用了这个想法。在我开始更多地使用它之前,我真的想确保它是使用 Executor、CompletionService 和 BlockingQueue 以及 Observer 实现多线程的正确有效方法。我将在下面提供示例代码,但首先让我快速解释一下我认为它是如何工作的,也许这会有所帮助。
我拥有的第一件事是 BlockingQueue,所有任务都通过 add(Task task) 方法添加到此队列中。在创建类时,调用 run 方法并调用 while(true) 调用队列阻塞,直到将某些内容添加到任务队列中。
一旦某些东西被添加到 run() queue.take() 内的队列中,就会返回队列中的项目。然后我拿那个项目并将它传递给 WorkerThread 类,该类在它上面做一些事情。该 workerThread 被添加到处理等待线程完成的 CompletionService 池中。
好的,现在是我不确定是否正确的部分。我还有一个实现可运行的内部类,并在类初始化时启动。它的工作是永远循环调用 pool.take()。因此,这实质上是等待其中一个 WorkerThread 完成。我让完成服务处理这个。一旦 take() 得到一个值,内部类将它传递给一个通知观察者方法。
这可以实现吗?让我有点担心的是,主类在任务队列上运行了一个 while(true) 循环,而一个内部类也在循环等待池以接收来自 WorkerThread 的结果?
public class HttpSchedulerThreaded extends Observable implements Runnable {
private ArrayList<Object> list;//holds [0]=VULNINFO, [1]=REQUESTBUILDER OBJECT
protected static Logger logger = Logger.getLogger(HttpScheduler.class.getName());
private CompletionService<VulnInfo> pool;
private ExecutorService executor ;
private Thread responseWorkerThread;
private HttpSchedulerWorker schedulerWorker;
private boolean shouldRun = true;
private CountDownLatch doneSignal;
private String[] vulnClassesIgnoreRedirect;
private boolean followRedirects;
private boolean runJavascriptInResponse;
private boolean isSSL;
private int numThreadsInPool;
private BlockingQueue<VulnInfo> queue;
private boolean isRunning ;
public HttpSchedulerThreaded(int numThreads)
numThreadsInPool = numThreads;
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
public HttpSchedulerThreaded()
numThreadsInPool = 1;
executor = Executors.newFixedThreadPool(1);
doneSignal = new CountDownLatch(1);
pool = new ExecutorCompletionService<VulnInfo>(executor);
schedulerWorker = new HttpSchedulerWorker();
responseWorkerThread = new Thread(schedulerWorker);
queue = new LinkedBlockingQueue<VulnInfo>();
public void setThreadCount(int numThreads)
executor = Executors.newFixedThreadPool(numThreads);
doneSignal = new CountDownLatch(numThreads);
pool = new ExecutorCompletionService<VulnInfo>(executor);
numThreadsInPool = numThreads;
public void start()
new Thread(this).start();
isRunning = true;
public void add(VulnInfo info) {
public void run() {
// TODO Auto-generated method stub
try {
VulnInfo info = queue.take();
Callable<VulnInfo> worker = new HttpSchedulerRequestSender(info,followRedirects,runJavascriptInResponse,vulnClassesIgnoreRedirect,doneSignal);
//System.out.println("submitting to pooler: " + info.getID());
} catch (InterruptedException e) {
// TODO Auto-generated catch block
* Inner class of proxy is a worker thread blocks until the pool has transactions complete as soon as they
* are complete it will send them to server for completion.
* @author Steve
class HttpSchedulerWorker implements Runnable{
public void run() {
// TODO Auto-generated method stub
VulnInfo vulnInfo = null;
try {
//System.out.println("taking finished request");
Future<VulnInfo> tmp = pool.take();
// Future<VulnInfo> tmp = pool.poll();
if(tmp != null)
vulnInfo = tmp.get();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
} catch (ExecutionException e) {
// TODO Auto-generated catch block
if(vulnInfo != null)
//System.out.println("updating all observers: " + vulnInfo.getID());