0

我正在尝试使用 cyclops-react 根据大小和时间对队列中的元素进行批处理,因此当没有元素时它不会阻塞

也许功能不是我所期望的,或者我做错了什么

完整的代码(Groovy)是这样的,生产者在另一个线程中:

            Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
    new Thread({
        while (true) {
            sleep(1000)
            queue.offer("New message " + System.currentTimeMillis());
        }
    }).start();

    StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor))
            .groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS)
            .forEach({i->println(i + " Batch Time: ${System.currentTimeMillis()}")})

输出是:

    [New message 1487673650332,  Batch Time: 1487673651356]
    [New message 1487673651348, New message 1487673652352,  Batch Time: 1487673653356]
    [New message 1487673653355, New message 1487673654357,  Batch Time: 1487673655362]
    [New message 1487673655362, New message 1487673656364,  Batch Time: 1487673657365]

但我期待每批中有一个元素,因为提供的元素之间的延迟是 10 秒,但批处理是每半秒

我还尝试了异步流(Groovy 代码):

    Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
    StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor))
            .async()
            .groupedBySizeAndTime(10, 500,TimeUnit.MILLISECONDS)
            .peek({i->println(i + "Batch Time: ${System.currentTimeMillis()}")}).run();

    while (true) {
        queue.offer("New message " + System.currentTimeMillis());
        sleep(1000)
    }

同样,它仅每 2 秒批处理一次,有时每批处理等待两个元素,即使批处理中的超时为半秒:

    [New message 1487673877780, Batch Time: 1487673878819]
    [New message 1487673878811, New message 1487673879812, Batch Time: 1487673880815]
    [New message 1487673880814, New message 1487673881819, Batch Time: 1487673882823]
    [New message 1487673882823, New message 1487673883824, Batch Time: 1487673884828]
    [New message 1487673884828, New message 1487673885831, Batch Time: 1487673886835]

我对非未来非惰性流进行了第三次实验,这次它奏效了。

    Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
    new Thread({
        while (true) {
            sleep(1000)
            queue.offer("New message " + System.currentTimeMillis());
        }
    }).start();

    queue.stream()
            .groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS)
            .forEach({i->println(i + " Batch Time " + System.currentTimeMillis())})

结果:

    [New message 1487673288017, New message 1487673289027,  Batch Time , 1487673289055]
    [New message 1487673290029,  Batch Time , 1487673290029]
    [New message 1487673291033,  Batch Time , 1487673291033]
    [New message 1487673292037,  Batch Time , 1487673292037]

当您使用未来的流时,为什么批处理的行为似乎是错误的?

4

1 回答 1

0

差异行为是由于降低了 async.Queue 的 FutureStreams 分组效率的错误(基本上这意味着下一个结果存在于前一个的 500 毫秒限制内,并且 Stream 将向队列询问另一个值并等待它到达)。这将在 cyclops-react 的未来版本中修复。

有几种方法可以解决这个问题

  1. 使用 Jesus Menendez 在错误报告中建议的解决方法

    queue.stream()
         .groupedBySizeAndTime(batchSize, batchTimeoutMillis, TimeUnit.MILLISECONDS)
         .futureStream(new LazyReact(ThreadPools.getSequential()))
         .async()
         .peek(this::executeBatch)
         .run();
    

这避免了导致两个值一起批处理的开销。

  1. 我们可以通过使用 streamBatch 运算符在 500 毫秒后超时(而不是等到队列中的值到达进行批处理)

    Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
    new Thread(()->{
        for(int i=0;i<10;i++){
    
            queue.offer("New message " + i);
            sleep(10000);
        }
        queue.close();
    }).start();
    
    long toRun = TimeUnit.MILLISECONDS.toNanos(500l);
    
    queue.streamBatch(new Subscription(), source->{
    
        return ()->{
            List<String> result = new ArrayList<>();
    
    
               long start = System.nanoTime();
    
                   while (result.size() < 10 && (System.nanoTime() - start) < toRun) {
                       try {
                           String next = source.apply(1l, TimeUnit.MILLISECONDS);
                           if (next != null) {
                               result.add(next);
                           }
                       }catch(Queue.QueueTimeoutException e){
    
                       }
    
    
                   }
    
            start=System.nanoTime();
    
            return result;
        };
    }).filter(l->l.size()>0)
      .futureStream(new LazyReact(ThreadPools.getSequential()))
            .async()
            .peek(System.out::println)
            .run();
    

在这种情况下,我们将始终在 500 毫秒后进行分组,而不是等到我们要求的值到达队列中。

于 2017-02-22T17:02:04.333 回答