0

在查看WiretapConnectorSpring 框架的源代码时,我偶然发现了一个 type 的对象MonoProcessor。我尝试用谷歌搜索它的用途解释,但无济于事。

Javadoc 对 Reactive/Reactor 外行说的不多:

AMonoProcessor是实现有状态语义的 Mono 扩展。允许多订阅。一旦 aMonoProcessor得到解决,新的订阅者将从缓存的结果中受益。

最后一句暗示计算的结果被缓存了,这似乎是MonoProcessor这段代码中的用法。

有人可以澄清一下预期的用例是MonoProcessor什么以及为什么首先引入它?

4

1 回答 1

1

用例是您想要一个创建“热”单声道的处理器,而不是通量。还提供诸如取消、处置、onNext 等处理器功能。由于 Mono 是单个值,因此它只能消耗单个 onNext,因此结果会被缓存以供将来的任何订阅使用。有效地将其从“热”变为“冷”。

热单声道示例

   //Set up flux processor, sink for thread safety
    DirectProcessor<Integer> directProcessor = DirectProcessor.create();
    FluxSink<Integer> sink = directProcessor.serialize().sink();

    //Allows dynamic creation of Mono value, after initialisation
    MonoProcessor<Integer> processor =
            directProcessor.filter(s -> s > 5)
            .next()
            .toProcessor();

     //Set up subscriptions, no values have been submitted to either yet
     processor.map(i -> "monoProc: " + i).subscribe(System.out::println);
     directProcessor.map(i -> "DirectProc: " + i).subscribe(System.out::println);

    //Uncomment and above Mono subscription will never occur
    //processor.cancel();

    //Values from some other service or whatever
    for (int i = 0; i < 10; i++) {
        sink.next(i);
    }

    //Do something later with cached result
    processor.map(i -> "monoProc cached: " + i).subscribe(System.out::println);
于 2020-05-19T19:01:15.467 回答