1

有以下问题:给定具有 partitionId 属性的事件列表(例如 0-10),我希望根据 paritionId 拆分传入事件,以便按接收顺序处理具有相同 partitionId 的事件. 如果分布或多或少均匀,这将导致并行处理 10 个事件(每个分区)。

除了创建 10 个单线程调度器并将事件发送到正确的调度器之外,还有没有办法使用 Project Reactor 完成上述任务?

谢谢。

4

1 回答 1

0

下面的代码

  • 将源流拆分为分区,
  • 创建 ParallelFlux,每个分区一个“轨道”,
  • 将“rails”调度到单独的线程中,
  • 收集结果

每个分区都有专用线程保证其值按原始顺序处理。

@Test
public void partitioning() throws InterruptedException {
    final int N = 10;
    Flux<Integer> source = Flux.range(1, 10000).share();
    // partition source into publishers
    Publisher<Integer>[] publishers = new Publisher[N];
    for (int i = 0; i < N; i++) {
        final int idx = i;
        publishers[idx] = source.filter(v -> v % N == idx);
    }
    // create ParallelFlux each 'rail' containing single partition
    ParallelFlux.from(publishers)
            // schedule partitions into different threads
            .runOn(Schedulers.newParallel("proc", N))
            // process each partition in its own thread, i.e. in order
            .map(it -> {
                String threadName = Thread.currentThread().getName();
                Assert.assertEquals("proc-" + (it % 10 + 1), threadName);
                return it;
            })
            // collect results on single 'rail'
            .sequential()
            // and on single thread called 'subscriber-1'
            .publishOn(Schedulers.newSingle("subscriber"))
            .subscribe(it -> {
                String threadName = Thread.currentThread().getName();
                Assert.assertEquals("subscriber-1", threadName);
            });
    Thread.sleep(1000);
}
于 2017-01-09T16:29:41.553 回答