1

我正在寻找akka.stream.scaladsl.Source可以让我简单地从不同的代码位置发出下一个值的构造方法(例如,监视系统事件)。

  • 我需要类似的东西Promise。Promise 向Future. 我需要向Source.
  • monix.reactive.subjects.BehaviorSubject.onNext(_)
  • 我不太关心背压。

目前我已经使用 monix 和 akka-streams(下面的代码)实现了这个,但我希望应该只有 akka-streams 解决方案:

import akka.stream.scaladsl.{Flow, Sink, Source}
import monix.reactive.subjects.BehaviorSubject
import monix.execution.Scheduler.Implicits.global

val bs = BehaviorSubject("") //monix subject is sink and source at the same time

//this is how it is currently implemented
def createSource() = { 
    val s1 = Source.fromPublisher(bs.toReactivePublisher) //here we create source from subject
    Flow.fromSinkAndSourceCoupled[String, String](Sink.ignore, s1)
}

//somewhere else in code... some event happened
//this is how it works in monix.
val res:Future[Ack] = bs.onNext("Event #1471 just happened!") //here we emit value
4

4 回答 4

2

也许您正在寻找演员来源

文档中的一个示例:

import akka.actor.typed.ActorRef
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.{ Sink, Source }
import akka.stream.typed.scaladsl.ActorSource

trait Protocol
case class Message(msg: String) extends Protocol
case object Complete extends Protocol
case class Fail(ex: Exception) extends Protocol

val source: Source[Protocol, ActorRef[Protocol]] = ActorSource.actorRef[Protocol](completionMatcher = {
  case Complete =>
}, failureMatcher = {
  case Fail(ex) => ex
}, bufferSize = 8, overflowStrategy = OverflowStrategy.fail)

val ref = source
  .collect {
    case Message(msg) => msg
  }
  .to(Sink.foreach(println))
  .run()

ref ! Message("msg1")

这样你就可以通过actor系统向actor发送消息,这些消息将从ActorSource下游发出。

于 2020-03-05T13:48:46.933 回答
0

Source顾名思义,抽象提供了处理数据源的 API。相反,您需要查看使用数据的抽象 - Sink. 并且Sink.foreach操作是您正在寻找的,很可能是:https ://doc.akka.io/docs/akka/current/stream/operators/Sink/foreach.html

在您的情况下,代码将类似于:

import akka.stream.scaladsl.{Sink, Source}

val s1 = Source.// your WS akka stream source
s1.runWith(Sink.foreach(write))

希望这可以帮助!

于 2020-03-05T12:55:18.443 回答
0

我认为您正在寻找的是。sink.foreach它为收到的每个元素调用一个给定的过程。我认为代码将如下所示:

s1.runWith(Sink.foreach(write))

本质上,正在做的事情是对于源流,接收器尝试写入该流的每个元素。

编辑

我想你正在寻找maybe. 它会创建一个源,一旦物化的 Promise 用一个值完成就会发出。查看这个文档

编辑

futureSource也可以工作。一旦成功完成,它就会流式传输给定未来源的元素。

让我知道它是否有帮助!

于 2020-03-05T13:07:03.923 回答
0

https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.htmlhttps://doc.akka.io/docs/akka/current/stream/operators/Source/fromPublisher。 html是您所需要的,具体取决于您的 Source 从何处使用数据。

于 2020-03-05T15:35:33.810 回答