0

我有两个独立的 ChronicleQueues,它们是由独立线程创建的,它们监视 Java 应用程序中的 Web 套接字流。当我在单独的单线程程序中独立读取每个队列时,我可以按预期遍历每个整个队列 - 使用以下最少的代码:

final ExcerptTailer queue1Tailer = queue1.createTailer();
final ExcerptTailer queue2Tailer = queue2.createTailer();

while (true)
{
   try( final DocumentContext context = queue1Tailer.readingDocument() )
   {
      if ( isNull(context.wire()) )
         break;

      counter1++;
      queue1Data = context.wire()
                           .bytes()
                           .readObject(Queue1Data.class);

      queue1Writer.write(String.format("%d\t%d\t%d%n", counter1, queue1Data.getEventTime(), queue1Data.getEventContent()));
   }
}

while (true)
{
   try( final DocumentContext context = queue2Tailer.readingDocument() )
   {
      if ( isNull(context.wire()) )
         break;

      counter2++;
      queue2Data = context.wire()
                           .bytes()
                           .readObject(Queue2Data.class);

      queue2Writer.write(String.format("%d\t%d\t%d%n", counter2, queue2Data.getEventTime(), queue2Data.getEventContent()));
   }
}

在上面,我能够读取所有 Queue1Data 对象,然后是所有 Queue2Data 对象并按预期访问值。但是,当我尝试交错读取队列时(根据 Queue1Data 对象的属性(时间戳)从一个队列中读取对象,读取 Queue2Data 对象,直到时间戳之后的第一个对象(下面的限制变量) ,找到活动的 Queue1Data 对象 - 然后对其进行处理)仅读取 queue2Tailer 中的一个对象后,抛出异常.DecoratedBufferUnderflowException: readCheckOffset0 failed。失败的简化代码如下(我尝试将外while(true)循环放在queue2Tailer try块的内部和外部):

final ExcerptTailer queue1Tailer = queue1Queue.createTailer("label1");

try( final DocumentContext queue1Context = queue1Tailer.readingDocument() )
{
   final ExcerptTailer queue2Tailer = queue2Queue.createTailer("label2");
    
   while (true)
   {
      try( final DocumentContext queue2Context = queue2Tailer.readingDocument() )
      {
         if ( isNull(queue2Context.wire()) )
         {
            terminate = true;
            break;
         }
         queue2Data = queue2Context.wire()
                                   .bytes()
                                   .readObject(Queue2Data.class);
         while(true)
         {
            queue1Data = queue1Context.wire()
                                          .bytes()
                                                  .readObject(Queue1Data.class);  // first read succeeds
                                                  
            if (queue1Data.getFieldValue() > limit)   // if this fails the inner loop continues
            {                                         // but the second read fails
               // cache a value
               break;
            }
         }

         // continue working with queu2Data object and cached values
      }   // end try block for queue2 tailer

   } // end outer while loop
}   // end outer try block for queue1 tailer

我已经按照上面的方法进行了尝试,并且在执行处理的函数的开头创建了两个 Tailers(在相对简单的 Java 应用程序中单击按钮时执行的私有函数)。基本上我采用了独立工作的循环,并将其放在函数的另一个循环中,预计不会出现问题。我想我在如何定位和用于读取对象方面遗漏了一些至关重要的东西,但我无法弄清楚它是什么 - 因为相同的基本代码在独立读取队列时起作用。使用isNull(context.wire())来确定队列中何时不再有对象我从其中一个示例中获得,但我不确定这是在顺序处理队列时确定队列中何时不再有对象的正确方法。

任何建议,将不胜感激。

4

2 回答 2

0

您一开始没有正确编写它。现在,有一种硬核方法可以实现您想要实现的目标(即,在较低级别上明确地做所有事情),并使用 Chronicle 提供的 MethodReader/MethodWriter 魔法。

铁杆方式

写作

// write first event type
try (DocumentContext dc = queueAppender.writingDocument()) {
    dc.wire().writeEventName("first").text("Hello first");
}
// write second event type
try (DocumentContext dc = queueAppender.writingDocument()) {
    dc.wire().writeEventName("second").text("Hello second");
}

这会将不同类型的消息写入同一个队列,您将能够在阅读时轻松区分它们。

阅读

StringBuilder reusable = new StringBuilder();
while (true) {
   try (DocumentContext dc = tailer.readingDocument()) {
       if (!dc.isPresent) {
           continue;
       }
       dc.wire().readEventName(reusable);
       if ("first".contentEquals(reusable)) {
           // handle first
       } else if ("second".contentEquals(reusable)) {
           // handle second
       }
       // optionally handle other events
   }
}

编年史之路(又名彼得的魔法)

这适用于任何可编组的类型,以及任何原始类型和 CharSequence 子类(即字符串)和字节。有关更多详细信息,请阅读 MethodReader/MethodWriter 文档。

假设您有一些数据类:

public class FirstDataType implements Marshallable { // alternatively - extends SelfDescribingMarshallable
    // data fields...
}

public class SecondDataType implements Marshallable { // alternatively - extends SelfDescribingMarshallable
    // data fields...
}

然后,要将这些数据类写入队列,您只需要定义接口,如下所示:

interface EventHandler {
    void first(FirstDataType first);
    void second(SecondDataType second);
}

写作

然后,写入数据很简单:

final EventHandler writer = appender.methodWriterBuilder(EventHandler).get();
// assuming firstDatum and secondDatum are created earlier
writer.first(firstDatum);
writer.second(secondDatum);

这与硬核部分中的内容相同 - 它写入事件名称(取自方法编写器中的方法名称,即相应的“第一”或“第二”),然后是实际的数据对象。

阅读

现在,要从队列中读取这些事件,您需要提供上述接口的实现,它将处理相应的事件类型,例如:

// you implement this to read data from the queue
private class MyEventHandler implements EventHandler {
    public void first(FirstDataType first) {
        // handle first type of events
    }
    public void second(SecondDataType second) {
        // handle second type of events
    }
}

然后你阅读如下:

EventHandler handler = new MyEventHandler();
MethodReader reader = tailer.methodReader(handler);
while (true) {
    reader.readOne(); // readOne returns boolean value which can be used to determine if there's no more data, and pause if appropriate
}

杂项

您不必使用相同的界面进行阅读和写作。如果您只想读取第二种类型的事件,您可以定义另一个接口:

interface OnlySecond {
    void second(SecondDataType second);
}

现在,如果您创建一个实现此接口的处理程序并将其提供给 tailer#methodReader() 调用,则 readOne() 调用将仅处理第二种类型的事件,而跳过所有其他事件。

这也适用于 MethodWriters,即如果您有多个进程写入不同类型的数据并且一个进程消耗所有这些数据,则定义多个接口用于写入数据然后单个接口扩展所有其他接口以进行读取的情况并不少见,例如:

interface FirstOut {
    void first(String first);
}
interface SecondOut {
    void second(long second);
}
interface ThirdOut {
    void third(ThirdDataType third);
}
interface AllIn extends FirstOut, SecondOut, ThirdOut {
}

(我特意为方法参数使用了不同的数据类型来展示如何使用各种类型)

于 2022-01-11T22:07:40.687 回答
0

通过进一步的测试,我发现可以使用嵌套循环来读取包含不同 POJO 类中的数据的多个队列。上述问题中代码的问题在于,在我期望阅读的循环之外queue1Context获得了一次。我的基本误解是对象管理步进通过队列中的对象,而实际上对象在顺序读取队列时管理步进(维护索引)。queue1Data objectsDocumentContextExcerptTailer

如果它可能有助于其他人刚刚开始使用 ChronicleQueues,原始问题中的内部循环应该是:

while(true)
{
    try (final DocumentContext queue1Context = queue1Tailer() )
    {
         queue1Data = queue1Context.wire()
                                          .bytes()
                                                  .readObject(Queue1Data.class);  // first read succeeds
                                                  
         if (queue1Data.getFieldValue() > limit)   // if this fails the inner loop continues as expected
         {                                         // and second and subsequent reads now succeed
            // cache a value
               break;
         }
    }
} 

当然,最外面的包含queue1Context(在原始代码中)的 try 块应该被删除。

于 2022-01-11T22:30:01.137 回答