我有一个涉及多线程的棘手问题。我所做的是使用一个线程池 ( ExecutorService),该线程池的任务是打开连接并将它们放入LinkedBlockingQueue.
到目前为止,我使用过:
//run method in "getter threads"
public void run() {
    try {
    URL url = new URL(url_s); //url_s is given as a constructor argument
    //if I am correct then url.openStream will wait until we have the content
    InputStream stream = url.openStream();
    Request req = new Request(); //a class with two variables:
    req.html_stream = new InputSource(stream);
    req.source = stream;
    //this is a class variable (LinkedBlockingQueue<Request>)
    blocking_queue.put(req);
    } catch  (Exception ex) {
    logger.info("Getter thread died from an exeption",ex);
    return;
    }
}
然后我有消费者线程 (java.lang.Thread) 接受这些InputSources 和InputStreams 并执行以下操作:
public void run() {
   while(running) {
        try {
            logger.info("waiting for data to eat");
            Request req = blocking_queue.take();
            if(req.html_stream != null)
            eat_data(req);
        } catch (Exception ex) {
            logger.error(ex);
            return;
        }
   }
}
eat_data 调用一个采用 InputSource 的外部库。该库使用单例实例进行处理,因此我不能将此步骤放在“getter”线程中。
当我针对少量数据测试这段代码时,它运行良好,但是当我为它提供数千个 URL 时,我开始遇到真正的问题。确切地找出问题所在并不容易,但我怀疑连接在消费者线程到达之前就超时了,有时甚至会导致死锁。
我以这种方式实现它是因为从 url.openStream() 到 InputSource 非常容易,但我意识到我确实必须在本地存储数据才能使其工作。
如何从 url.openStream() 获取可以存储在我的LinkedBlockingQueue(内存中的所有数据)中的某个对象,当我的消费者线程有时间处理它时,我可以将其转换为 InputSoruce?