1

Spring webflux 的 Reactor 中的 ReplaySubject (RxJava) 等价物是什么?

我需要一个热的可观察频道,它可以有 N 个订阅者并保存以前发布的元素,如 rxJava 中的 ReplaySubject。感谢您的评论,我正在使用 Spring webflux。

我在 Reactor 中找不到类似 ReplaySubject 的东西。

如果没有 ReplaySubject 的实现,我该如何实现?,我有一些代码,例如:

class ReplayFlux<T> {
private val elements: List<T> = emptyList()
private val currentSubscriptions: List<FluxSink<T>> = emptyList()

fun publish(element: T){
    elements.plus(element)
    currentSubscriptions.forEach { 
        it.next(element)
    }
}

fun subscribe(): Flux<T>{
    return Flux.create {fluxSink -> 
        currentSubscriptions.plus(fluxSink)
        elements.forEach {element -> 
            fluxSink.next(element)
        }
    }
}

fun finish(){ currentSubscriptions.forEach { it.complete() } }
}

显然这不是线程安全的,有人有更好的小鬼。如果您认为这不是那么糟糕,那么在这种情况下您应该使用哪些线程安全数据结构?

4

0 回答 0