我已将其定义为全局:
Processor<Integer, Integer> p = RingBufferProcessor.create("test", 32);
Stream<List<Integer>> s = Streams.wrap(p).distinct().buffer(5, TimeUnit.SECONDS).log().unbounded();
在构造函数上:
s.consume(i -> System.err.println(Thread.currentThread() + " data=" + i));
现在我两次调用这个函数:
for (int i = 0; i < 1000; i++) {
p.onNext(i % 3);
}
独特的作品很好,我第一次消费。当我再次调用此方法时,他仍然记得不同的并且不会触发消耗。
每次我们消费后是否有任何清洁选项。我需要实现的想法是,我将缓冲所有输入,并且每次我只会消耗唯一的项目..
有人有什么想法吗?
肿瘤坏死因子