我有一个涉及多线程的棘手问题。我所做的是使用一个线程池 ( ExecutorService
),该线程池的任务是打开连接并将它们放入LinkedBlockingQueue
.
到目前为止,我使用过:
//run method in "getter threads"
public void run() {
try {
URL url = new URL(url_s); //url_s is given as a constructor argument
//if I am correct then url.openStream will wait until we have the content
InputStream stream = url.openStream();
Request req = new Request(); //a class with two variables:
req.html_stream = new InputSource(stream);
req.source = stream;
//this is a class variable (LinkedBlockingQueue<Request>)
blocking_queue.put(req);
} catch (Exception ex) {
logger.info("Getter thread died from an exeption",ex);
return;
}
}
然后我有消费者线程 (java.lang.Thread) 接受这些InputSource
s 和InputStream
s 并执行以下操作:
public void run() {
while(running) {
try {
logger.info("waiting for data to eat");
Request req = blocking_queue.take();
if(req.html_stream != null)
eat_data(req);
} catch (Exception ex) {
logger.error(ex);
return;
}
}
}
eat_data 调用一个采用 InputSource 的外部库。该库使用单例实例进行处理,因此我不能将此步骤放在“getter”线程中。
当我针对少量数据测试这段代码时,它运行良好,但是当我为它提供数千个 URL 时,我开始遇到真正的问题。确切地找出问题所在并不容易,但我怀疑连接在消费者线程到达之前就超时了,有时甚至会导致死锁。
我以这种方式实现它是因为从 url.openStream() 到 InputSource 非常容易,但我意识到我确实必须在本地存储数据才能使其工作。
如何从 url.openStream() 获取可以存储在我的LinkedBlockingQueue
(内存中的所有数据)中的某个对象,当我的消费者线程有时间处理它时,我可以将其转换为 InputSoruce?