2

我有一个涉及多线程的棘手问题。我所做的是使用一个线程池 ( ExecutorService),该线程池的任务是打开连接并将它们放入LinkedBlockingQueue.

到目前为止,我使用过:

//run method in "getter threads"
public void run() {

    try {

    URL url = new URL(url_s); //url_s is given as a constructor argument

    //if I am correct then url.openStream will wait until we have the content
    InputStream stream = url.openStream();

    Request req = new Request(); //a class with two variables:
    req.html_stream = new InputSource(stream);
    req.source = stream;

    //this is a class variable (LinkedBlockingQueue<Request>)
    blocking_queue.put(req);

    } catch  (Exception ex) {
    logger.info("Getter thread died from an exeption",ex);
    return;
    }
}

然后我有消费者线程 (java.lang.Thread) 接受这些InputSources 和InputStreams 并执行以下操作:

public void run() {
   while(running) {
        try {
            logger.info("waiting for data to eat");
            Request req = blocking_queue.take();
            if(req.html_stream != null)
            eat_data(req);
        } catch (Exception ex) {
            logger.error(ex);
            return;
        }
   }
}

eat_data 调用一个采用 InputSource 的外部库。该库使用单例实例进行处理,因此我不能将此步骤放在“getter”线程中。

当我针对少量数据测试这段代码时,它运行良好,但是当我为它提供数千个 URL 时,我开始遇到真正的问题。确切地找出问题所在并不容易,但我怀疑连接在消费者线程到达之前就超时了,有时甚至会导致死锁。

我以这种方式实现它是因为从 url.openStream() 到 InputSource 非常容易,但我意识到我确实必须在本地存储数据才能使其工作。

如何从 url.openStream() 获取可以存储在我的LinkedBlockingQueue(内存中的所有数据)中的某个对象,当我的消费者线程有时间处理它时,我可以将其转换为 InputSoruce?

4

1 回答 1

2

您可以将 URL 的内容复制到 aByteArrayOutputStream并关闭 URL Stream。然后将其存储ByteArrayInputStream在队列中。

伪代码:

InputStream in = null;
try {
    in = url.openStream();
    ByteArrayOutputStream buffer = new ByteArrayOutputStream();
    IOUtils.copy(in, buffer);

    ByteArrayInputStream bin = new ByteArrayInputStream(buffer.toByteArray());
    queue.put(bin);
}

参考 :

  1. java.io.ByteArrayInputStream
  2. java.io.ByteArrayOutputStream
  3. org.apache.commons.io.IOUtils.IOUtils
于 2012-10-01T05:52:48.817 回答