Spring webflux 的 Reactor 中的 ReplaySubject (RxJava) 等价物是什么?
我需要一个热的可观察频道,它可以有 N 个订阅者并保存以前发布的元素,如 rxJava 中的 ReplaySubject。感谢您的评论,我正在使用 Spring webflux。
我在 Reactor 中找不到类似 ReplaySubject 的东西。
如果没有 ReplaySubject 的实现,我该如何实现?,我有一些代码,例如:
class ReplayFlux<T> {
private val elements: List<T> = emptyList()
private val currentSubscriptions: List<FluxSink<T>> = emptyList()
fun publish(element: T){
elements.plus(element)
currentSubscriptions.forEach {
it.next(element)
}
}
fun subscribe(): Flux<T>{
return Flux.create {fluxSink ->
currentSubscriptions.plus(fluxSink)
elements.forEach {element ->
fluxSink.next(element)
}
}
}
fun finish(){ currentSubscriptions.forEach { it.complete() } }
}
显然这不是线程安全的,有人有更好的小鬼。如果您认为这不是那么糟糕,那么在这种情况下您应该使用哪些线程安全数据结构?