4

假设以下场景:

我们有一个来自某个库的 Java 类,它消耗字节流,比如某个XmlParser1公开方法的XML 解析器xmlParser1.parse(inputStream);该方法通常会在一次调用中吃掉所有字节,最终阻塞。我们还有另一个类,来自其他一些库,它做类似的事情,但实现不同:XmlParser2with xmlParser2.parse(inputStream)。现在,我们想用两个解析器解析一个流。

我的第一个答案是:我们搞砸了。由于我们无法控制每个类如何使用流,我们所能做的就是缓冲内存中的所有字节或临时文件(或打开/重新打开它,如果可能的话)。这些消费者的 API 本质上是不合作的。

现在,假设我们可以控制XmlParser1(实现和签名)并且我们希望以更灵活和协作的方式对其进行编码,以便调用者可以以某种合理和有效的方式实现上述行为......你有什么建议?

我正在考虑的一些替代方案:

1) make XmlParser1implement FilterInputStream,这样当某个类 ( XmlParser1) 尝试从中读取一些字节时,它会在内部解析它必须的内容(迭代地,也许有一些合理的缓冲)并返回原始字节。(这并不完全符合这个FilterInputStream概念,我会说)。通过这种方式,客户端代码可以简单地链接解析器:

   public class XmlParser1 extends FilterInputStream {
       public XmlParser1(InputStream rawInputStream) { ... } 
       public int read(byte[] b, int off, int l) throws IOException {
           // this would invoke the underlying stream read, parse internall the read bytes,
           // and leave them in the buffer
       }
   }

   XmlParser1 parser1 = new XmlParser1(inputstream);
   XmlParser2 parser2 = new XmlParser2(parse); 
   parser2.parse(); // parser2 consumes all the input stream, which causes parser1 to read an parse it too

2)与其将 atXmlParser1视为字节的消费者,不如将其视为一个接收:我们不会让它吃掉自己的字节,我们会用勺子喂它。所以,不是xmlParser1.parse(inputStream)我们可以有 xmlParser1.write(byte[])... 也就是说,不是传递它 anInputStream我们将使它成为一个OutputStream. 这将允许客户端创建一个 TeeInputStream 将字节透明地传递给XmlParser2类,同时调用XmlParser1.write()

请注意,在任何情况下我们都不需要单独的线程。

我不确定哪一个(如果有的话)在概念上更可取,是否有更好的选择。在我看来,这是一个应该已经讨论过的设计问题,但我没有找到太多 - 不一定仅限于 Java。欢迎提出意见和参考。

4

5 回答 5

1

假设两个解析器在两个单独的线程中运行,它可能是这样的(不是工作代码)

public class Test extends FilterInputStream {
    byte[] buf = new byte[8192];
    int len;
    Thread thread = null;

    @Override
    public synchronized int read(byte[] b, int off, int l) throws IOException {
        while (thread == Thread.currentThread() && len > 0) {
            thread.wait();
        }
        if (len > 0) {
            System.arraycopy(buf, 0, b, off, l);
            len = 0;
            return l;
        }
        len = super.read(b, off, l);
        System.arraycopy(b, off, buf, 0, len);
        thread = Thread.currentThread();
        notify();
        return len;
    }

也就是说,#1 读取字节并将它们保存在 buf 中,#1 的下一次尝试被阻止,直到 #2 从缓冲区中读取所有内容

于 2013-01-04T06:03:18.287 回答
1

如果您的线程在同一台服务器上,那么您拆分 InputStreams 的想法将没有任何意义。因为您仍将只使用一个 InputStream 和一个 BufferedInputStream 来获取数据,从 InputStreams 中创建对象,然后在两个不同的运行线程中使用这些对象。结论:在 Java 中,任何时候都没有必要阻塞任何 InputStream。我什至认为它是有害的,因为如果你阻塞,如果你的缓冲区或管道溢出会发生什么?队列溢出!

编辑:如果你想停止一个流,你需要告诉发件人不要再发送任何数据。或者你像 youtube 那样做,他们将视频分成几部分(即 1 分钟的 1 部分)并且只预加载那些部分,所以停止视频根本不会影响预加载,因为它只有在你到达时间轴中的某个位置时才会预加载(比如在 45 秒、1 分 45 秒、2 分 45 秒等)。 ,这实际上只是一种预加载技术,并没有真正的流媒体,这就是为什么 Youtube 不需要搞砸丢包的原因。)

但是,我仍然有几行伪代码给你,客户端:

BufferedOutputStream bos = new BufferedOutputStream(/*yourBasicInputStream*/);
ObjectOutputStream oos = new ObjectOutputStream(bos);  //Or use another wrapper
oos.writeObject(yourObjectToSend);      //Or use another parser: Look into the API: ObjectInputStream

主线程控制器(又名服务器)中的类变量:

Thread thread1;  //e.g. a GUI controller
Thread thread2;  //e.g. a DB controller

服务器(或由服务器启动的另一个服务器线程,两个线程都作为参数):

BufferedInputStream bis = new BufferedInputStream(/*yourBasicInputStream*/);
ObjectInputStream ois = new ObjectInputStream(bis);   //Or use another wrapper
//now we use an interface MyNetObject implementing the method getTarget(), but
//also an abstract class would be possible (with a more complex getTarget-method):
MyNetObject netObject = (MyNetObject) ois.readObject();   //Or use another parser...
if(netObject.getTarget()=="Thread1ClassANDThread2Class"){
    thread1.activateSync(netObject);        //notify...  
    thread2.activateSync(netObject);        //...both threads!
}
else if(netObject.getTarget()=="Thread1Class"){
    thread1.activate(netObject);        //give thread1 a notification
}
else if(netObject.getTarget()=="Thread2Class"){
    thread2.activate(netObject);        //give thread2 a notification
}
else {//do something else...}

不要忘记同步“activateSync(netObject)”-方法,但前提是您想对对象进行任何更改(您不需要同步读数,只需同步写作):

public void activateSync(MyNetObject netObject){
    synchronize(netObject){
        //do whatever you wanna do with the object...the other thread will always get the actual object!
    }
}

这很容易、快速、一致……并且完全面向对象。希望你能明白这一点。;)

更新:

重要的是要了解流或阅读器实际上也是“解析器”。有一个重要区别:流(通常)是网络驱动的类,用于写入和读取任何类型的数据 - 除了字符。虽然阅读器用于阅读任何类型的文本/字符。所以你正确的实现是这样的:用一些流读取传入的数据包,然后将数据存储到适当的对象中。然后你就有了一个通用对象,你可以在任何类型的阅读器中使用它。如果您只有一张图片要阅读,您可以尝试readUTF()类中的解析器ObjectInputStreamhttp://docs.oracle.com/javase/1.4.2/docs/api/java/io/ObjectInputStream.html),它会产生一个字符串:

BufferedInputStream bis = new BufferedInputStream(/*yourBasicInputStream*/);
ObjectInputStream ois = new ObjectInputStream(bis);
String string = ois.readUTF();   //Or another usable parser/method
XmlParser1.read(string);      //for reads there is...
XmlParser2.read(string);      //...no synchronisation needed!

现在,唯一剩下的就是教解析器如何读取该字符串。而对象字符串本身可以被视为“接收器”。如果这对您不起作用,只需找到另一个解析器/方法来创建“接收器”对象。

请注意,这里讨论的解决方案 - 使用类 ObjectInputStream 和适当的解析器 - 在许多情况下也适用,也适用于大数据(然后你只需在通过网络发送之前将一个 1GB 的文件分割成几个字符串/对象“数据包”,就像种子一样)。但它不适用于视频/音频流,您可能会丢包并且需要完全不同的解决方案(这本身就是一门科学:http ://www.google.ch/search?q=video+stream+数据包+丢弃)。

于 2013-02-07T20:35:55.573 回答
0

我尝试将原始输入流拉入 Apache Commons TeeInputStream 以创建输出流。 http://commons.apache.org/io/api-release/org/apache/commons/io/input/TeeInputStream.html

作为 IeeInputStream 要写入的 OutputStream,我使用了 java PipedOutputStream。

我将此 PipedOutputStream 连接到 java PipedInputStream。

这让我能够阅读 TeeInputStream 和 PipedInputStream。不确定它是否适合您,或者它是否至少会提供下一步。

我创建了一个 ReaderThread 类来检查我是否可以并行读取它们:

  private static class ReaderThread extends Thread
  {
    InputStream inStream;
    int threadId;
    public ReaderThread(int threadId, InputStream inStream)
    {
      this.inStream = inStream;
      this.threadId = threadId;
    }

    @Override
    public void run()
    {
      try
      {
        int c = inStream.read();
        while (c != -1)
        {
          System.out.println("From ("+threadId+ ") "+c);
          c = inStream.read();
        }
      }
      catch (Exception e)
      {
        e.printStackTrace();
      }
    }
  }

然后由以下代码驱动:

InputStream inStream = new FileInputStream(fileName);
PipedInputStream pipedInStream = new PipedInputStream();
OutputStream pipedOutStream = new PipedOutputStream(pipedInStream);
TeeInputStream tin = new TeeInputStream(inStream,
  pipedOutStream);

ReaderThread firstThread = new ReaderThread(1,tin);
ReaderThread secondThread = new ReaderThread(2,pipedInStream);

firstThread.start();
secondThread.start();
于 2013-02-07T17:26:21.923 回答
0

你说过“那些消费者的 API 本质上是不合作的。” 所以不要试图制造它们,保持隔离并给他们想要的东西。分离输入流。

让一个线程读取真实的输入流并写入两个输出流。

然后从这些输出流中创建输入流,您可以使用 Piped Streams 执行此操作

pipedInputStream1 = new PipedInputStream(pipedOutputStream1);

ByteArrayInputStream(((ByteArrayOutputStream) byteOutputStream1).toByteArray());

于 2013-02-11T17:12:19.937 回答
0

有点不清楚 XmlParse1 和 XmlParser2 内部发生了什么,但假设他们真正关心最终的 XML 数据而不是 InputStream 字节,我会切换到 StAX XMLEvent api。您可以使两个解析器都实现XMLEventConsumer。然后,您只有一个外部循环,它解析实际流并将事件传递给消费者:

public void parseXml(InputStream stream) {
  XMLEventReader reader = ...; // convert stream into XMLEventReader

  XMLConsumer[] consumers = new XMLConsumer[]{new XmlParser1(), new XmlParser2()};

  while(reader.hasNext()) {
    XMLEvent event = reader.nextEvent();
    for(XMLConsumer consumer : consumers) {
      consumer.add(event));
  }
}
于 2013-02-08T19:08:54.933 回答