用例是您想要一个创建“热”单声道的处理器,而不是通量。还提供诸如取消、处置、onNext 等处理器功能。由于 Mono 是单个值,因此它只能消耗单个 onNext,因此结果会被缓存以供将来的任何订阅使用。有效地将其从“热”变为“冷”。
热单声道示例
//Set up flux processor, sink for thread safety
DirectProcessor<Integer> directProcessor = DirectProcessor.create();
FluxSink<Integer> sink = directProcessor.serialize().sink();
//Allows dynamic creation of Mono value, after initialisation
MonoProcessor<Integer> processor =
directProcessor.filter(s -> s > 5)
.next()
.toProcessor();
//Set up subscriptions, no values have been submitted to either yet
processor.map(i -> "monoProc: " + i).subscribe(System.out::println);
directProcessor.map(i -> "DirectProc: " + i).subscribe(System.out::println);
//Uncomment and above Mono subscription will never occur
//processor.cancel();
//Values from some other service or whatever
for (int i = 0; i < 10; i++) {
sink.next(i);
}
//Do something later with cached result
processor.map(i -> "monoProc cached: " + i).subscribe(System.out::println);