0

有时必须在我参与的项目中实施经典的并发生产者 - 消费者解决方案,几乎可以减少从多个线程填充一些集合并由多个消费者使用的集合。简而言之,集合说有界到 10k 个实体,一旦达到缓冲区大小,就会提交消耗这 10k 个实体的工作任务,这样的工作人员有一个限制,说它设置为 10,在最坏的情况下,这意味着我可以拥有到 10 个工人,每个工人消耗 10k 个实体。

我确实必须在这里进行一些锁定,并且对缓冲区溢出进行一些检查(当生产者在所有工作人员忙于处理其块时生成过多数据的情况)因此必须丢弃新事件以避免OOM(不是最好的解决方案,但稳定性是p1 ;))

这些天一直在寻找 reactor 和使用它的方法,而不是降低级别并执行上述所有操作,所以愚蠢的问题是:“reactor 可以用于这个用例吗?” 现在忘记溢出/丢弃..我怎样才能为广播公司实现 N 个消费者?

正在寻找带有缓冲区+线程池调度程序的广播器:

void test() {
  final Broadcaster<String> sink =   Broadcaster.create(Environment.initialize());
  Dispatcher dispatcher = Environment.newDispatcher(2048, 20, DispatcherType.WORK_QUEUE);

  sink
    .buffer(100)
    .consumeOn(dispatcher, this::log);

  for (int i=0; i<100000; i++) {
    sink.onNext("elementent " + i);
    if (i%1000 == 0) {
      System.out.println("addded elements " + i);
    }
  }
}
 void log(List<String> values) {
  System.out.print("simulating slow processing....");
  System.out.println("processing: " + Arrays.toString(values.toArray()));
  try {
    Thread.sleep(1000);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
}

我的意图是让广播公司在达到缓冲区大小时以异步方式执行 log(..),但是看起来它总是在阻塞模式下执行 log(...)。执行 100 一次完成下一个 100 等等.. 我怎样才能让它异步?

谢谢 vyvalyty

4

1 回答 1

0

一种可能的模式是将 flatMap 与 publishOn 一起使用:

Flux.range(1, 1_000_000)
.buffer(100)
.flatMap(b -> Flux.just(b).publishOn(SchedulerGroup.io())
   .doOnNext(this::log))
.consume(...);
于 2016-03-30T13:14:36.153 回答