我正在尝试使用 cyclops-react 根据大小和时间对队列中的元素进行批处理,因此当没有元素时它不会阻塞
也许功能不是我所期望的,或者我做错了什么
完整的代码(Groovy)是这样的,生产者在另一个线程中:
Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
new Thread({
while (true) {
sleep(1000)
queue.offer("New message " + System.currentTimeMillis());
}
}).start();
StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor))
.groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS)
.forEach({i->println(i + " Batch Time: ${System.currentTimeMillis()}")})
输出是:
[New message 1487673650332, Batch Time: 1487673651356]
[New message 1487673651348, New message 1487673652352, Batch Time: 1487673653356]
[New message 1487673653355, New message 1487673654357, Batch Time: 1487673655362]
[New message 1487673655362, New message 1487673656364, Batch Time: 1487673657365]
但我期待每批中有一个元素,因为提供的元素之间的延迟是 10 秒,但批处理是每半秒
我还尝试了异步流(Groovy 代码):
Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
StreamSource.futureStream(queue, new LazyReact(ThreadPools.queueCopyExecutor))
.async()
.groupedBySizeAndTime(10, 500,TimeUnit.MILLISECONDS)
.peek({i->println(i + "Batch Time: ${System.currentTimeMillis()}")}).run();
while (true) {
queue.offer("New message " + System.currentTimeMillis());
sleep(1000)
}
同样,它仅每 2 秒批处理一次,有时每批处理等待两个元素,即使批处理中的超时为半秒:
[New message 1487673877780, Batch Time: 1487673878819]
[New message 1487673878811, New message 1487673879812, Batch Time: 1487673880815]
[New message 1487673880814, New message 1487673881819, Batch Time: 1487673882823]
[New message 1487673882823, New message 1487673883824, Batch Time: 1487673884828]
[New message 1487673884828, New message 1487673885831, Batch Time: 1487673886835]
我对非未来非惰性流进行了第三次实验,这次它奏效了。
Queue<String> queue = QueueFactories.<String>unboundedQueue().build();
new Thread({
while (true) {
sleep(1000)
queue.offer("New message " + System.currentTimeMillis());
}
}).start();
queue.stream()
.groupedBySizeAndTime(10,500,TimeUnit.MILLISECONDS)
.forEach({i->println(i + " Batch Time " + System.currentTimeMillis())})
结果:
[New message 1487673288017, New message 1487673289027, Batch Time , 1487673289055]
[New message 1487673290029, Batch Time , 1487673290029]
[New message 1487673291033, Batch Time , 1487673291033]
[New message 1487673292037, Batch Time , 1487673292037]
当您使用未来的流时,为什么批处理的行为似乎是错误的?