有以下问题:给定具有 partitionId 属性的事件列表(例如 0-10),我希望根据 paritionId 拆分传入事件,以便按接收顺序处理具有相同 partitionId 的事件. 如果分布或多或少均匀,这将导致并行处理 10 个事件(每个分区)。
除了创建 10 个单线程调度器并将事件发送到正确的调度器之外,还有没有办法使用 Project Reactor 完成上述任务?
谢谢。
有以下问题:给定具有 partitionId 属性的事件列表(例如 0-10),我希望根据 paritionId 拆分传入事件,以便按接收顺序处理具有相同 partitionId 的事件. 如果分布或多或少均匀,这将导致并行处理 10 个事件(每个分区)。
除了创建 10 个单线程调度器并将事件发送到正确的调度器之外,还有没有办法使用 Project Reactor 完成上述任务?
谢谢。
下面的代码
每个分区都有专用线程保证其值按原始顺序处理。
@Test
public void partitioning() throws InterruptedException {
final int N = 10;
Flux<Integer> source = Flux.range(1, 10000).share();
// partition source into publishers
Publisher<Integer>[] publishers = new Publisher[N];
for (int i = 0; i < N; i++) {
final int idx = i;
publishers[idx] = source.filter(v -> v % N == idx);
}
// create ParallelFlux each 'rail' containing single partition
ParallelFlux.from(publishers)
// schedule partitions into different threads
.runOn(Schedulers.newParallel("proc", N))
// process each partition in its own thread, i.e. in order
.map(it -> {
String threadName = Thread.currentThread().getName();
Assert.assertEquals("proc-" + (it % 10 + 1), threadName);
return it;
})
// collect results on single 'rail'
.sequential()
// and on single thread called 'subscriber-1'
.publishOn(Schedulers.newSingle("subscriber"))
.subscribe(it -> {
String threadName = Thread.currentThread().getName();
Assert.assertEquals("subscriber-1", threadName);
});
Thread.sleep(1000);
}