我正在寻找akka.stream.scaladsl.Source
可以让我简单地从不同的代码位置发出下一个值的构造方法(例如,监视系统事件)。
- 我需要类似的东西
Promise
。Promise 向Future
. 我需要向Source
. - 像
monix.reactive.subjects.BehaviorSubject.onNext(_)
- 我不太关心背压。
目前我已经使用 monix 和 akka-streams(下面的代码)实现了这个,但我希望应该只有 akka-streams 解决方案:
import akka.stream.scaladsl.{Flow, Sink, Source}
import monix.reactive.subjects.BehaviorSubject
import monix.execution.Scheduler.Implicits.global
val bs = BehaviorSubject("") //monix subject is sink and source at the same time
//this is how it is currently implemented
def createSource() = {
val s1 = Source.fromPublisher(bs.toReactivePublisher) //here we create source from subject
Flow.fromSinkAndSourceCoupled[String, String](Sink.ignore, s1)
}
//somewhere else in code... some event happened
//this is how it works in monix.
val res:Future[Ack] = bs.onNext("Event #1471 just happened!") //here we emit value